From ef37158f078ee37e0888ddbd29ea30349a88a2c8 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Tue, 4 Jun 2024 19:13:38 +0800 Subject: [PATCH 01/12] add test case without code change Signed-off-by: Megrez Lu --- .../grpc/xds/GrpcXdsClientImplTestBase.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index fd276a849ce..9b00013ecb6 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2614,6 +2614,58 @@ public void cdsResourceDeleted() { verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); } + @Test + public void cdsResourcesDelete_edsUnsubscribed() { + Assume.assumeFalse(ignoreResourceDeletion()); + + List subscribedResourceNames = ImmutableList.of("A"); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "A", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + verifyResourceMetadataRequested(CDS, "A"); + verifyResourceMetadataRequested(EDS, "A.1"); + verifySubscribedResourcesMetadataSizes(0, 1, 0, 1); + + // CDS -> {A}, version 1 + ImmutableMap resourcesV1 = ImmutableMap.of( + "A", Any.pack(mf.buildEdsCluster("A", "A.1", "round_robin", null, null, false, null, + "envoy.transport_sockets.tls", null, null + ))); + call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000"); + // {A, B} -> ACK, version 1 + verifyResourceMetadataAcked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); + call.verifyRequest(CDS, subscribedResourceNames, VERSION_1, "0000", NODE); + + // EDS -> {A.1}, version 1 + List dropOverloads = ImmutableList.of(); + List endpointsV1 = ImmutableList.of(lbEndpointHealthy); + ImmutableMap resourcesV11 = ImmutableMap.of( + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); + call.sendResponse(EDS, resourcesV11.values().asList(), VERSION_1, "0000"); + // {A.1} -> ACK, version 1 + verifyResourceMetadataAcked(EDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2); + verify(cdsResourceWatcher, times(1)).onChanged(any()); + + // Empty CDS response deletes the cluster. + call.sendResponse(CDS, Collections.emptyList(), VERSION_2, "0001"); + call.verifyRequest(CDS, "A", VERSION_2, "0001", NODE); + verify(cdsResourceWatcher).onResourceDoesNotExist("A"); + verifyResourceMetadataDoesNotExist(CDS, "A"); + verifySubscribedResourcesMetadataSizes(0, 1, 0, 1); + // Empty CDS leads to EDS resource "A.1" unsubscribed. + xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); + + // Send any EDS will not trigger any ACK/NACK response + Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment("A.1", + ImmutableList.of(mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 0)), + ImmutableList.of())); + call.sendResponse(EDS, updatedClusterLoadAssignment, VERSION_2, "0001"); + call.verifyNoMoreRequest(); + } + /** * When ignore_resource_deletion server feature is on, xDS client should keep the deleted cluster * on empty response, and resume the normal work when CDS contains the cluster again. From 133de5e58fbb2d88b0cdd5bd823e435102755b13 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Wed, 5 Jun 2024 16:23:13 +0800 Subject: [PATCH 02/12] modify test case Signed-off-by: Megrez Lu --- .../io/grpc/xds/client/XdsClientImpl.java | 3 +- .../grpc/xds/GrpcXdsClientImplTestBase.java | 91 ++++++++----------- 2 files changed, 39 insertions(+), 55 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 969660bf7d4..499eecdd035 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -281,7 +281,7 @@ public void cancelXdsResourceWatch(XdsResourceType @SuppressWarnings("unchecked") public void run() { ResourceSubscriber subscriber = - (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName);; + (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.cancelResourceWatch(); @@ -291,7 +291,6 @@ public void run() { } if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); - subscribedResourceTypeUrls.remove(type.typeUrl()); } } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 9b00013ecb6..19877ddcb41 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -428,7 +428,6 @@ private void verifyResourceCount( XdsResourceType type, int size) { if (size == 0) { - assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isFalse(); assertThat(subscribedResourcesMetadata.containsKey(type)).isFalse(); } else { assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isTrue(); @@ -2614,58 +2613,6 @@ public void cdsResourceDeleted() { verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); } - @Test - public void cdsResourcesDelete_edsUnsubscribed() { - Assume.assumeFalse(ignoreResourceDeletion()); - - List subscribedResourceNames = ImmutableList.of("A"); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "A", cdsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); - DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); - assertThat(call).isNotNull(); - verifyResourceMetadataRequested(CDS, "A"); - verifyResourceMetadataRequested(EDS, "A.1"); - verifySubscribedResourcesMetadataSizes(0, 1, 0, 1); - - // CDS -> {A}, version 1 - ImmutableMap resourcesV1 = ImmutableMap.of( - "A", Any.pack(mf.buildEdsCluster("A", "A.1", "round_robin", null, null, false, null, - "envoy.transport_sockets.tls", null, null - ))); - call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000"); - // {A, B} -> ACK, version 1 - verifyResourceMetadataAcked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); - call.verifyRequest(CDS, subscribedResourceNames, VERSION_1, "0000", NODE); - - // EDS -> {A.1}, version 1 - List dropOverloads = ImmutableList.of(); - List endpointsV1 = ImmutableList.of(lbEndpointHealthy); - ImmutableMap resourcesV11 = ImmutableMap.of( - "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); - call.sendResponse(EDS, resourcesV11.values().asList(), VERSION_1, "0000"); - // {A.1} -> ACK, version 1 - verifyResourceMetadataAcked(EDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2); - verify(cdsResourceWatcher, times(1)).onChanged(any()); - - // Empty CDS response deletes the cluster. - call.sendResponse(CDS, Collections.emptyList(), VERSION_2, "0001"); - call.verifyRequest(CDS, "A", VERSION_2, "0001", NODE); - verify(cdsResourceWatcher).onResourceDoesNotExist("A"); - verifyResourceMetadataDoesNotExist(CDS, "A"); - verifySubscribedResourcesMetadataSizes(0, 1, 0, 1); - // Empty CDS leads to EDS resource "A.1" unsubscribed. - xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); - verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); - - // Send any EDS will not trigger any ACK/NACK response - Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment("A.1", - ImmutableList.of(mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", - mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 0)), - ImmutableList.of())); - call.sendResponse(EDS, updatedClusterLoadAssignment, VERSION_2, "0001"); - call.verifyNoMoreRequest(); - } - /** * When ignore_resource_deletion server feature is on, xDS client should keep the deleted cluster * on empty response, and resume the normal work when CDS contains the cluster again. @@ -2809,6 +2756,44 @@ public void edsResourceNotFound() { verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); } + @Test + public void edsAllowRespondAfterUnsubscription() { + Assume.assumeFalse(ignoreResourceDeletion()); + + // Suppose we have an EDS subscription A.1 + List subscribedResourceNames = ImmutableList.of("A.1"); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + verifyResourceMetadataRequested(EDS, "A.1"); + verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); + + // EDS -> {A.1}, version 1 + List dropOverloads = ImmutableList.of(); + List endpointsV1 = ImmutableList.of(lbEndpointHealthy); + ImmutableMap resourcesV1 = ImmutableMap.of( + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); + call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000"); + // {A.1} -> ACK, version 1 + verifyResourceMetadataAcked(EDS, "A.1", resourcesV1.get("A.1"), VERSION_1, TIME_INCREMENT); + verify(edsResourceWatcher, times(1)).onChanged(any()); + + // trigger an EDS resource unsubscription. This would probably be caused by CDS PUSH(let's say event e1) in the real world. + // Then there can be a potential data race between + // 1) the EDS unsubscription caused by CDS PUSH e1 (client-side) and, + // 2) the immediate EDS PUSH from XdsServer (server-side) after CDS PUSH e1 (event e2). + xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + verifySubscribedResourcesMetadataSizes(0, 0, 0, 0); + // An EDS PUSH after CDS PUSH e1. + List endpointsV2 = ImmutableList.of(lbEndpointHealthy); + ImmutableMap resourcesV2 = ImmutableMap.of( + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV2, dropOverloads))); + call.sendResponse(EDS, resourcesV2.values().asList(), VERSION_2, "0001"); + // This will send an empty resource list to the XdsServer + call.verifyRequest(EDS, Collections.emptyList(), VERSION_2, "0001", NODE); + verifyNoMoreInteractions(edsResourceWatcher); + } + @Test public void edsResponseErrorHandling_allResourcesFailedUnpack() { DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, From 037cdef81b73e6a81f3218b976e06bf5af2fc535 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Wed, 5 Jun 2024 16:46:56 +0800 Subject: [PATCH 03/12] fix empty resource subscription --- xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java | 4 +--- xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java | 4 +++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 761c10ede6a..c3cea9f52f3 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -152,9 +152,7 @@ void adjustResourceSubscription(XdsResourceType resourceType) { startRpcStream(); } Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); - if (resources != null) { - adsStream.sendDiscoveryRequest(resourceType, resources); - } + adsStream.sendDiscoveryRequest(resourceType, resources == null ? Collections.emptySet() : resources); } /** diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 19877ddcb41..34dfb90558c 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2783,13 +2783,15 @@ public void edsAllowRespondAfterUnsubscription() { // 1) the EDS unsubscription caused by CDS PUSH e1 (client-side) and, // 2) the immediate EDS PUSH from XdsServer (server-side) after CDS PUSH e1 (event e2). xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + // FIX(1): allow to send empty subscription + call.verifyRequest(EDS, Collections.emptyList(), VERSION_1, "0000", NODE); verifySubscribedResourcesMetadataSizes(0, 0, 0, 0); // An EDS PUSH after CDS PUSH e1. List endpointsV2 = ImmutableList.of(lbEndpointHealthy); ImmutableMap resourcesV2 = ImmutableMap.of( "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV2, dropOverloads))); call.sendResponse(EDS, resourcesV2.values().asList(), VERSION_2, "0001"); - // This will send an empty resource list to the XdsServer + // FIX(2): allow to update resource version even if the subscription resource is empty call.verifyRequest(EDS, Collections.emptyList(), VERSION_2, "0001", NODE); verifyNoMoreInteractions(edsResourceWatcher); } From 9ed7e27261fa34d53b6fd6f727ec1d31dde4659c Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Wed, 5 Jun 2024 17:09:55 +0800 Subject: [PATCH 04/12] fix stylecheck --- xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java | 3 ++- xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index c3cea9f52f3..6c6f0a7104d 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -152,7 +152,8 @@ void adjustResourceSubscription(XdsResourceType resourceType) { startRpcStream(); } Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); - adsStream.sendDiscoveryRequest(resourceType, resources == null ? Collections.emptySet() : resources); + adsStream.sendDiscoveryRequest(resourceType, + resources == null ? Collections.emptySet() : resources); } /** diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 34dfb90558c..894dcad7c4e 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2761,7 +2761,6 @@ public void edsAllowRespondAfterUnsubscription() { Assume.assumeFalse(ignoreResourceDeletion()); // Suppose we have an EDS subscription A.1 - List subscribedResourceNames = ImmutableList.of("A.1"); xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); From 3da29af31e57be1892440219cad3a9a7a8f5c70b Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Wed, 5 Jun 2024 17:32:10 +0800 Subject: [PATCH 05/12] fix test style Signed-off-by: Megrez Lu --- xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 894dcad7c4e..51776eeda37 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2777,7 +2777,8 @@ public void edsAllowRespondAfterUnsubscription() { verifyResourceMetadataAcked(EDS, "A.1", resourcesV1.get("A.1"), VERSION_1, TIME_INCREMENT); verify(edsResourceWatcher, times(1)).onChanged(any()); - // trigger an EDS resource unsubscription. This would probably be caused by CDS PUSH(let's say event e1) in the real world. + // trigger an EDS resource unsubscription. + // This would probably be caused by CDS PUSH(let's say event e1) in the real world. // Then there can be a potential data race between // 1) the EDS unsubscription caused by CDS PUSH e1 (client-side) and, // 2) the immediate EDS PUSH from XdsServer (server-side) after CDS PUSH e1 (event e2). From 041809d8017018275b445d842da85af440f9be85 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Thu, 6 Jun 2024 09:59:02 +0800 Subject: [PATCH 06/12] revert back changes and cleanup nonce if subscription is empty Signed-off-by: Megrez Lu --- .../java/io/grpc/xds/client/ControlPlaneClient.java | 10 +++++++--- .../main/java/io/grpc/xds/client/XdsClientImpl.java | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 6c6f0a7104d..b6cebecdc3a 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -133,7 +133,7 @@ public void run() { xdsTransport.shutdown(); } }); - } + }r @Override public String toString() { @@ -152,8 +152,12 @@ void adjustResourceSubscription(XdsResourceType resourceType) { startRpcStream(); } Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); - adsStream.sendDiscoveryRequest(resourceType, - resources == null ? Collections.emptySet() : resources); + if (resources != null) { + adsStream.sendDiscoveryRequest(resourceType, resources); + } else { + // cleanup the nonce for the resource type if it's not subscribed to anymore. + adsStream.respNonces.remove(resourceType); + } } /** diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 499eecdd035..79147cd9862 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -291,6 +291,7 @@ public void run() { } if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); + subscribedResourceTypeUrls.remove(type.typeUrl()); } } } From b61742c9962172ee12256f7589e5bbf6001c8a39 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Thu, 6 Jun 2024 10:07:17 +0800 Subject: [PATCH 07/12] revert back test method --- xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java | 2 +- xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index b6cebecdc3a..a2fce546b66 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -133,7 +133,7 @@ public void run() { xdsTransport.shutdown(); } }); - }r + } @Override public String toString() { diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 51776eeda37..2cde0b9e0f5 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -428,6 +428,7 @@ private void verifyResourceCount( XdsResourceType type, int size) { if (size == 0) { + assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isFalse(); assertThat(subscribedResourcesMetadata.containsKey(type)).isFalse(); } else { assertThat(subscribedTypeUrls.containsKey(type.typeUrl())).isTrue(); From c8fa46c4a11bf6d860318f8d335b7c59cef018a8 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 17 Jun 2024 10:39:16 +0800 Subject: [PATCH 08/12] add testing methods --- .../grpc/xds/client/ControlPlaneClient.java | 9 ++++++ .../io/grpc/xds/client/XdsClientImpl.java | 2 +- .../grpc/xds/GrpcXdsClientImplTestBase.java | 18 ++++------- .../client/ControlPlaneClientTestBase.java | 30 +++++++++++++++++++ 4 files changed, 46 insertions(+), 13 deletions(-) create mode 100644 xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index a2fce546b66..d6cf8207582 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -205,6 +205,15 @@ boolean isReady() { return adsStream != null && adsStream.call != null && adsStream.call.isReady(); } + @Nullable + @VisibleForTesting + Map, String> getNonce() { + if (adsStream == null) { + return null; + } + return adsStream.respNonces; + } + /** * Starts a timer for each requested resource that hasn't been responded to and * has been waiting for the channel to get ready. diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 79147cd9862..af91449c9fb 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -84,7 +84,7 @@ public void uncaughtException(Thread t, Throwable e) { final Map serverLrsClientMap = new HashMap<>(); - private final Map serverCpClientMap = new HashMap<>(); + final Map serverCpClientMap = new HashMap<>(); private final Map, Map>> resourceSubscribers = new HashMap<>(); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 2cde0b9e0f5..e0e2a55151e 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -87,6 +87,7 @@ import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; +import io.grpc.xds.client.ControlPlaneClientTestBase; import io.grpc.xds.client.EnvoyProtoData.Node; import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.client.Locality; @@ -140,7 +141,7 @@ @RunWith(JUnit4.class) // The base class was used to test both xds v2 and v3. V2 is dropped now so the base class is not // necessary. Still keep it for future version usage. Remove if too much trouble to maintain. -public abstract class GrpcXdsClientImplTestBase { +public abstract class GrpcXdsClientImplTestBase extends ControlPlaneClientTestBase { private static final String SERVER_URI = "trafficdirector.googleapis.com"; private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com"; private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; @@ -2758,7 +2759,7 @@ public void edsResourceNotFound() { } @Test - public void edsAllowRespondAfterUnsubscription() { + public void edsCleanupNonceAfterUnsubscription() { Assume.assumeFalse(ignoreResourceDeletion()); // Suppose we have an EDS subscription A.1 @@ -2776,6 +2777,7 @@ public void edsAllowRespondAfterUnsubscription() { call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A.1} -> ACK, version 1 verifyResourceMetadataAcked(EDS, "A.1", resourcesV1.get("A.1"), VERSION_1, TIME_INCREMENT); + call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE); verify(edsResourceWatcher, times(1)).onChanged(any()); // trigger an EDS resource unsubscription. @@ -2784,17 +2786,9 @@ public void edsAllowRespondAfterUnsubscription() { // 1) the EDS unsubscription caused by CDS PUSH e1 (client-side) and, // 2) the immediate EDS PUSH from XdsServer (server-side) after CDS PUSH e1 (event e2). xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); - // FIX(1): allow to send empty subscription - call.verifyRequest(EDS, Collections.emptyList(), VERSION_1, "0000", NODE); verifySubscribedResourcesMetadataSizes(0, 0, 0, 0); - // An EDS PUSH after CDS PUSH e1. - List endpointsV2 = ImmutableList.of(lbEndpointHealthy); - ImmutableMap resourcesV2 = ImmutableMap.of( - "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV2, dropOverloads))); - call.sendResponse(EDS, resourcesV2.values().asList(), VERSION_2, "0001"); - // FIX(2): allow to update resource version even if the subscription resource is empty - call.verifyRequest(EDS, Collections.emptyList(), VERSION_2, "0001", NODE); - verifyNoMoreInteractions(edsResourceWatcher); + // The nonce has been removed + assertThat(getNonceForResourceType(xdsClient, xdsServerInfo, EDS)).isNull(); } @Test diff --git a/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java b/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java new file mode 100644 index 00000000000..ae3fd73ab60 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.client; + +import java.util.Map; + +public abstract class ControlPlaneClientTestBase { + protected static String getNonceForResourceType(XdsClientImpl xdsClient, Bootstrapper.ServerInfo serverInfo, XdsResourceType type) { + ControlPlaneClient controlPlaneClient = xdsClient.serverCpClientMap.get(serverInfo); + Map, String> nonceMap = controlPlaneClient.getNonce(); + if (nonceMap == null) { + return null; + } + return nonceMap.get(type); + } +} From 2b636c27e6c52223727f0e64b54e3251a150e897 Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Mon, 17 Jun 2024 11:26:57 +0800 Subject: [PATCH 09/12] fix test case --- .../xds/client/ControlPlaneClientTestBase.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java b/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java index ae3fd73ab60..1d51946ee0b 100644 --- a/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java @@ -19,12 +19,14 @@ import java.util.Map; public abstract class ControlPlaneClientTestBase { - protected static String getNonceForResourceType(XdsClientImpl xdsClient, Bootstrapper.ServerInfo serverInfo, XdsResourceType type) { - ControlPlaneClient controlPlaneClient = xdsClient.serverCpClientMap.get(serverInfo); - Map, String> nonceMap = controlPlaneClient.getNonce(); - if (nonceMap == null) { - return null; - } - return nonceMap.get(type); + protected static String getNonceForResourceType(XdsClientImpl xdsClient, + Bootstrapper.ServerInfo serverInfo, + XdsResourceType type) { + ControlPlaneClient controlPlaneClient = xdsClient.serverCpClientMap.get(serverInfo); + Map, String> nonceMap = controlPlaneClient.getNonce(); + if (nonceMap == null) { + return null; } + return nonceMap.get(type); + } } From 09f18f1b7826a27c032c7db1d5d9faf02110b595 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 24 Jul 2024 12:02:57 -0700 Subject: [PATCH 10/12] Clean up version as well; verify traffic to control plane --- .../grpc/xds/client/ControlPlaneClient.java | 3 +- .../grpc/xds/GrpcXdsClientImplTestBase.java | 30 +++++++++++------ .../io/grpc/xds/GrpcXdsClientImplV3Test.java | 5 +-- .../client/ControlPlaneClientTestBase.java | 32 ------------------- 4 files changed, 25 insertions(+), 45 deletions(-) delete mode 100644 xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index d6cf8207582..b573e9cc78e 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -155,7 +155,8 @@ void adjustResourceSubscription(XdsResourceType resourceType) { if (resources != null) { adsStream.sendDiscoveryRequest(resourceType, resources); } else { - // cleanup the nonce for the resource type if it's not subscribed to anymore. + // The resource type no longer has subscribing resources; clean up references to it + versions.remove(resourceType); adsStream.respNonces.remove(resourceType); } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index e0e2a55151e..6fefeba13e6 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -134,6 +134,7 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; /** * Tests for {@link XdsClientImpl}. @@ -2766,8 +2767,7 @@ public void edsCleanupNonceAfterUnsubscription() { xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); - verifyResourceMetadataRequested(EDS, "A.1"); - verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); + call.verifyRequest(EDS, "A.1", "", "", NODE); // EDS -> {A.1}, version 1 List dropOverloads = ImmutableList.of(); @@ -2776,19 +2776,17 @@ public void edsCleanupNonceAfterUnsubscription() { "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A.1} -> ACK, version 1 - verifyResourceMetadataAcked(EDS, "A.1", resourcesV1.get("A.1"), VERSION_1, TIME_INCREMENT); call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE); verify(edsResourceWatcher, times(1)).onChanged(any()); // trigger an EDS resource unsubscription. - // This would probably be caused by CDS PUSH(let's say event e1) in the real world. - // Then there can be a potential data race between - // 1) the EDS unsubscription caused by CDS PUSH e1 (client-side) and, - // 2) the immediate EDS PUSH from XdsServer (server-side) after CDS PUSH e1 (event e2). xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); verifySubscribedResourcesMetadataSizes(0, 0, 0, 0); - // The nonce has been removed - assertThat(getNonceForResourceType(xdsClient, xdsServerInfo, EDS)).isNull(); + + // When re-subscribing, the version and nonce were properly forgotten, so the request is the + // same as the initial request + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2)); } @Test @@ -3821,10 +3819,22 @@ protected abstract static class DiscoveryRpcCall { protected void verifyRequest( XdsResourceType type, List resources, String versionInfo, String nonce, - Node node) { + Node node, VerificationMode verificationMode) { throw new UnsupportedOperationException(); } + protected void verifyRequest( + XdsResourceType type, List resources, String versionInfo, String nonce, + Node node) { + verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000)); + } + + protected void verifyRequest( + XdsResourceType type, String resource, String versionInfo, String nonce, + Node node, VerificationMode verificationMode) { + verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode); + } + protected void verifyRequest( XdsResourceType type, String resource, String versionInfo, String nonce, Node node) { verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java index 71d0895a252..2b2ce5cbd72 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java @@ -118,6 +118,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.InOrder; import org.mockito.Mockito; +import org.mockito.verification.VerificationMode; /** * Tests for {@link XdsClientImpl} with protocol version v3. @@ -205,8 +206,8 @@ private DiscoveryRpcCallV3(StreamObserver requestObserver, @Override protected void verifyRequest( XdsResourceType type, List resources, String versionInfo, String nonce, - EnvoyProtoData.Node node) { - verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher( + EnvoyProtoData.Node node, VerificationMode verificationMode) { + verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher( node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null))); } diff --git a/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java b/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java deleted file mode 100644 index 1d51946ee0b..00000000000 --- a/xds/src/test/java/io/grpc/xds/client/ControlPlaneClientTestBase.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2024 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds.client; - -import java.util.Map; - -public abstract class ControlPlaneClientTestBase { - protected static String getNonceForResourceType(XdsClientImpl xdsClient, - Bootstrapper.ServerInfo serverInfo, - XdsResourceType type) { - ControlPlaneClient controlPlaneClient = xdsClient.serverCpClientMap.get(serverInfo); - Map, String> nonceMap = controlPlaneClient.getNonce(); - if (nonceMap == null) { - return null; - } - return nonceMap.get(type); - } -} From c84946a848c282a215796373e7f810e007e06cf5 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 24 Jul 2024 12:03:13 -0700 Subject: [PATCH 11/12] Delete invasive test inspection methods --- .../main/java/io/grpc/xds/client/ControlPlaneClient.java | 9 --------- xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java | 2 +- .../test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java | 3 +-- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index b573e9cc78e..841f73a1f74 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -206,15 +206,6 @@ boolean isReady() { return adsStream != null && adsStream.call != null && adsStream.call.isReady(); } - @Nullable - @VisibleForTesting - Map, String> getNonce() { - if (adsStream == null) { - return null; - } - return adsStream.respNonces; - } - /** * Starts a timer for each requested resource that hasn't been responded to and * has been waiting for the channel to get ready. diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index af91449c9fb..79147cd9862 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -84,7 +84,7 @@ public void uncaughtException(Thread t, Throwable e) { final Map serverLrsClientMap = new HashMap<>(); - final Map serverCpClientMap = new HashMap<>(); + private final Map serverCpClientMap = new HashMap<>(); private final Map, Map>> resourceSubscribers = new HashMap<>(); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 6fefeba13e6..6c117d88575 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -87,7 +87,6 @@ import io.grpc.xds.client.Bootstrapper.BootstrapInfo; import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; -import io.grpc.xds.client.ControlPlaneClientTestBase; import io.grpc.xds.client.EnvoyProtoData.Node; import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.client.Locality; @@ -142,7 +141,7 @@ @RunWith(JUnit4.class) // The base class was used to test both xds v2 and v3. V2 is dropped now so the base class is not // necessary. Still keep it for future version usage. Remove if too much trouble to maintain. -public abstract class GrpcXdsClientImplTestBase extends ControlPlaneClientTestBase { +public abstract class GrpcXdsClientImplTestBase { private static final String SERVER_URI = "trafficdirector.googleapis.com"; private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com"; private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; From c2da8a0f7982b2da83a6bee88b146f7e49c3a027 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 29 Jul 2024 14:59:52 -0700 Subject: [PATCH 12/12] Send unsubscribing DiscoveryRequest --- .../main/java/io/grpc/xds/client/ControlPlaneClient.java | 8 +++++--- .../test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 841f73a1f74..3074d1120ad 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -152,9 +152,11 @@ void adjustResourceSubscription(XdsResourceType resourceType) { startRpcStream(); } Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); - if (resources != null) { - adsStream.sendDiscoveryRequest(resourceType, resources); - } else { + if (resources == null) { + resources = Collections.emptyList(); + } + adsStream.sendDiscoveryRequest(resourceType, resources); + if (resources.isEmpty()) { // The resource type no longer has subscribing resources; clean up references to it versions.remove(resourceType); adsStream.respNonces.remove(resourceType); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 6c117d88575..6b04edcb9b8 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -2781,6 +2781,7 @@ public void edsCleanupNonceAfterUnsubscription() { // trigger an EDS resource unsubscription. xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); verifySubscribedResourcesMetadataSizes(0, 0, 0, 0); + call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE); // When re-subscribing, the version and nonce were properly forgotten, so the request is the // same as the initial request