Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 70 additions & 64 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.NameResolver;
import io.grpc.NameResolver.ResolutionResult;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
Expand Down Expand Up @@ -657,79 +658,84 @@

@Override
public void onResult(final ResolutionResult resolutionResult) {
class NameResolved implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
}
backoffPolicy = null; // reset backoff sequence if succeeded
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>();
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
Attributes attr = eag.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
.build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
lbRegistry, Collections.<DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
handleEndpointResourceUpdate();
syncContext.execute(() -> onResult2(resolutionResult));
}

@Override
public Status onResult2(final ResolutionResult resolutionResult) {
if (shutdown) {
return Status.OK;

Check warning on line 667 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L667

Added line #L667 was not covered by tests
}
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>();
StatusOr<List<EquivalentAddressGroup>> addressesOrError =
resolutionResult.getAddressesOrError();
if (addressesOrError.hasValue()) {
backoffPolicy = null; // reset backoff sequence if succeeded
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
Attributes attr = eag.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
.build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
lbRegistry, Collections.<DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
handleEndpointResourceUpdate();
return Status.OK;
} else {
handleErrorInSyncContext(addressesOrError.getStatus());
return addressesOrError.getStatus();
}

syncContext.execute(new NameResolved());
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
status = error;
// NameResolver.Listener API cannot distinguish between address-not-found and
// transient errors. If the error occurs in the first resolution, treat it as
// address not found. Otherwise, either there is previously resolved addresses
// previously encountered error, propagate the error to downstream/upstream and
// let downstream/upstream handle it.
if (!resolved) {
resolved = true;
handleEndpointResourceUpdate();
} else {
handleEndpointResolutionError();
}
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
return;
}
if (backoffPolicy == null) {
backoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = backoffPolicy.nextBackoffNanos();
logger.log(XdsLogLevel.DEBUG,
syncContext.execute(() -> handleErrorInSyncContext(error));
}

private void handleErrorInSyncContext(final Status error) {
if (shutdown) {
return;

Check warning on line 710 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L710

Added line #L710 was not covered by tests
}
status = error;
// NameResolver.Listener API cannot distinguish between address-not-found and
// transient errors. If the error occurs in the first resolution, treat it as
// address not found. Otherwise, either there is previously resolved addresses
// previously encountered error, propagate the error to downstream/upstream and
// let downstream/upstream handle it.
if (!resolved) {
resolved = true;
handleEndpointResourceUpdate();
} else {
handleEndpointResolutionError();
}
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
return;

Check warning on line 725 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L725

Added line #L725 was not covered by tests
}
if (backoffPolicy == null) {
backoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = backoffPolicy.nextBackoffNanos();
logger.log(XdsLogLevel.DEBUG,
"Logical DNS resolver for cluster {0} encountered name resolution "
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
name, error, delayNanos);
scheduledRefresh =
scheduledRefresh =
syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
timeService);
}
});
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
timeService);
}
}
}
Expand Down
83 changes: 68 additions & 15 deletions xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public XdsClient returnObject(Object object) {
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
private int xdsClientRefs;
private ClusterResolverLoadBalancer loadBalancer;
private NameResolverProvider fakeNameResolverProvider;

@Before
public void setUp() throws URISyntaxException {
Expand All @@ -216,7 +217,8 @@ public void setUp() throws URISyntaxException {
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();
nsRegistry.register(new FakeNameResolverProvider());
fakeNameResolverProvider = new FakeNameResolverProvider(false);
nsRegistry.register(fakeNameResolverProvider);
when(helper.getNameResolverRegistry()).thenReturn(nsRegistry);
when(helper.getNameResolverArgs()).thenReturn(args);
when(helper.getSynchronizationContext()).thenReturn(syncContext);
Expand Down Expand Up @@ -826,6 +828,17 @@ public void handleEdsResource_noHealthyEndpoint() {

@Test
public void onlyLogicalDnsCluster_endpointsResolved() {
do_onlyLogicalDnsCluster_endpointsResolved();
}

@Test
public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() {
nsRegistry.deregister(fakeNameResolverProvider);
nsRegistry.register(new FakeNameResolverProvider(true));
do_onlyLogicalDnsCluster_endpointsResolved();
}

void do_onlyLogicalDnsCluster_endpointsResolved() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
Expand Down Expand Up @@ -854,7 +867,6 @@ public void onlyLogicalDnsCluster_endpointsResolved() {
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
assertThat(childBalancer.addresses.get(1).getAttributes()
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);

}

@Test
Expand All @@ -874,37 +886,48 @@ public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
}

@Test
public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
public void resolutionError_backoffAndRefresh() {
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
}

@Test
public void oldListenerCallback_resolutionError_backoffAndRefresh() {
nsRegistry.deregister(fakeNameResolverProvider);
nsRegistry.register(new FakeNameResolverProvider(true));
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
}

void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
backoffPolicy1, backoffPolicy2);
backoffPolicy1, backoffPolicy2);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
deliverLbConfig(config);
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
assertThat(childBalancers).isEmpty();
Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
resolver.deliverError(error);
inOrder.verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(), error, null);
assertThat(resolver.refreshCount).isEqualTo(0);
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
.isEqualTo(1L);
.isEqualTo(1L);
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
assertThat(resolver.refreshCount).isEqualTo(1);

error = Status.UNKNOWN.withDescription("I am lost");
resolver.deliverError(error);
inOrder.verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertPicker(pickerCaptor.getValue(), error, null);
assertThat(fakeClock.getPendingTasks()).hasSize(1);
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
.isEqualTo(10L);
.isEqualTo(10L);
fakeClock.forwardTime(10L, TimeUnit.SECONDS);
assertThat(resolver.refreshCount).isEqualTo(2);

Expand All @@ -914,7 +937,7 @@ public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
assertThat(childBalancers).hasSize(1);
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2),
Iterables.getOnlyElement(childBalancers).addresses);
Iterables.getOnlyElement(childBalancers).addresses);

assertThat(fakeClock.getPendingTasks()).isEmpty();
inOrder.verifyNoMoreInteractions();
Expand Down Expand Up @@ -1319,10 +1342,18 @@ void deliverError(Status error) {
}

private class FakeNameResolverProvider extends NameResolverProvider {
private final boolean useOldListenerCallback;

private FakeNameResolverProvider(boolean useOldListenerCallback) {
this.useOldListenerCallback = useOldListenerCallback;
}

@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
assertThat(targetUri.getScheme()).isEqualTo("dns");
FakeNameResolver resolver = new FakeNameResolver(targetUri);
FakeNameResolver resolver = useOldListenerCallback
? new FakeNameResolverUsingOldListenerCallback(targetUri)
: new FakeNameResolver(targetUri);
resolvers.add(resolver);
return resolver;
}
Expand All @@ -1343,9 +1374,10 @@ protected int priority() {
}
}


private class FakeNameResolver extends NameResolver {
private final URI targetUri;
private Listener2 listener;
protected Listener2 listener;
private int refreshCount;

private FakeNameResolver(URI targetUri) {
Expand All @@ -1372,12 +1404,33 @@ public void shutdown() {
resolvers.remove(this);
}

private void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
syncContext.execute(() -> {
Status ret = listener.onResult2(ResolutionResult.newBuilder()
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
assertThat(ret.getCode()).isEqualTo(Status.Code.OK);
});
}

protected void deliverError(Status error) {
syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder()
.setAddressesOrError(StatusOr.fromStatus(error)).build()));
}
}

private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver {
private FakeNameResolverUsingOldListenerCallback(URI targetUri) {
super(targetUri);
}

@Override
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
listener.onResult(ResolutionResult.newBuilder()
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
}

private void deliverError(Status error) {
@Override
protected void deliverError(Status error) {
listener.onError(error);
}
}
Expand Down