3030import org .elasticsearch .threadpool .ThreadPool ;
3131
3232import java .util .Collections ;
33+ import java .util .concurrent .Semaphore ;
3334import java .util .concurrent .TimeUnit ;
3435
3536import static org .elasticsearch .transport .RemoteClusterConnectionTests .startTransport ;
@@ -69,7 +70,6 @@ public void testConnectAndExecuteRequest() throws Exception {
6970 }
7071 }
7172
72- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/29547" )
7373 public void testEnsureWeReconnect () throws Exception {
7474 Settings remoteSettings = Settings .builder ().put (ClusterName .CLUSTER_NAME_SETTING .getKey (), "foo_bar_cluster" ).build ();
7575 try (MockTransportService remoteTransport = startTransport ("remote_node" , Collections .emptyList (), Version .CURRENT , threadPool ,
@@ -79,17 +79,35 @@ public void testEnsureWeReconnect() throws Exception {
7979 .put (RemoteClusterService .ENABLE_REMOTE_CLUSTERS .getKey (), true )
8080 .put ("search.remote.test.seeds" , remoteNode .getAddress ().getAddress () + ":" + remoteNode .getAddress ().getPort ()).build ();
8181 try (MockTransportService service = MockTransportService .createNewService (localSettings , Version .CURRENT , threadPool , null )) {
82+ Semaphore semaphore = new Semaphore (1 );
8283 service .start ();
84+ service .addConnectionListener (new TransportConnectionListener () {
85+ @ Override
86+ public void onNodeDisconnected (DiscoveryNode node ) {
87+ if (remoteNode .equals (node )) {
88+ semaphore .release ();
89+ }
90+ }
91+ });
92+ // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have
93+ // the right calls in place in the RemoteAwareClient
8394 service .acceptIncomingRequests ();
84- service .disconnectFromNode (remoteNode );
85- RemoteClusterService remoteClusterService = service .getRemoteClusterService ();
86- assertBusy (() -> assertFalse (remoteClusterService .isRemoteNodeConnected ("test" , remoteNode )));
87- Client client = remoteClusterService .getRemoteClusterClient (threadPool , "test" );
88- ClusterStateResponse clusterStateResponse = client .admin ().cluster ().prepareState ().execute ().get ();
89- assertNotNull (clusterStateResponse );
90- assertEquals ("foo_bar_cluster" , clusterStateResponse .getState ().getClusterName ().value ());
95+ for (int i = 0 ; i < 10 ; i ++) {
96+ semaphore .acquire ();
97+ try {
98+ service .disconnectFromNode (remoteNode );
99+ semaphore .acquire ();
100+ RemoteClusterService remoteClusterService = service .getRemoteClusterService ();
101+ Client client = remoteClusterService .getRemoteClusterClient (threadPool , "test" );
102+ ClusterStateResponse clusterStateResponse = client .admin ().cluster ().prepareState ().execute ().get ();
103+ assertNotNull (clusterStateResponse );
104+ assertEquals ("foo_bar_cluster" , clusterStateResponse .getState ().getClusterName ().value ());
105+ assertTrue (remoteClusterService .isRemoteNodeConnected ("test" , remoteNode ));
106+ } finally {
107+ semaphore .release ();
108+ }
109+ }
91110 }
92111 }
93112 }
94-
95113}
0 commit comments