Skip to content

Commit 0608da8

Browse files
LB-YuLiebing
authored andcommitted
[rpc] Avoid NettyClient#getOrCreateConnection run to deadlock in JDK8
1 parent dbbfc79 commit 0608da8

File tree

1 file changed

+49
-14
lines changed

1 file changed

+49
-14
lines changed

fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.concurrent.CompletableFuture;
48+
import java.util.concurrent.ExecutionException;
4849
import java.util.concurrent.TimeUnit;
4950
import java.util.function.Supplier;
5051

@@ -71,6 +72,8 @@ public final class NettyClient implements RpcClient {
7172
*/
7273
private final Map<String, ServerConnection> connections;
7374

75+
private final Map<String, CompletableFuture<ServerConnection>> connectionFutures;
76+
7477
/** Metric groups for client. */
7578
private final ClientMetricGroup clientMetricGroup;
7679

@@ -87,6 +90,7 @@ public final class NettyClient implements RpcClient {
8790
public NettyClient(
8891
Configuration conf, ClientMetricGroup clientMetricGroup, boolean isInnerClient) {
8992
this.connections = MapUtils.newConcurrentHashMap();
93+
this.connectionFutures = MapUtils.newConcurrentHashMap();
9094

9195
// build bootstrap
9296
this.eventGroup =
@@ -188,20 +192,51 @@ public void close() throws Exception {
188192

189193
private ServerConnection getOrCreateConnection(ServerNode node) {
190194
String serverId = node.uid();
191-
return connections.computeIfAbsent(
192-
serverId,
193-
ignored -> {
194-
LOG.debug("Creating connection to server {}.", node);
195-
ServerConnection connection =
196-
new ServerConnection(
197-
bootstrap,
198-
node,
199-
clientMetricGroup,
200-
authenticatorSupplier.get(),
201-
isInnerClient);
202-
connection.whenClose(ignore -> connections.remove(serverId, connection));
203-
return connection;
204-
});
195+
196+
ServerConnection existing = connections.get(serverId);
197+
if (existing != null) {
198+
return existing;
199+
}
200+
201+
CompletableFuture<ServerConnection> newFuture = new CompletableFuture<>();
202+
CompletableFuture<ServerConnection> f = connectionFutures.putIfAbsent(serverId, newFuture);
203+
if (f == null) {
204+
f = newFuture;
205+
try {
206+
LOG.debug("Creating connection to server {}.", node);
207+
ServerConnection conn =
208+
new ServerConnection(
209+
bootstrap,
210+
node,
211+
clientMetricGroup,
212+
authenticatorSupplier.get(),
213+
isInnerClient);
214+
215+
// We must add the connection to the connections map before registering close
216+
// callback.
217+
// Otherwise, the connection may never be removed from the connections map if the
218+
// connection close immediately.
219+
connections.put(serverId, conn);
220+
conn.whenClose(ignored -> connections.remove(serverId, conn));
221+
222+
newFuture.complete(conn);
223+
} finally {
224+
connectionFutures.remove(serverId, newFuture);
225+
}
226+
}
227+
228+
try {
229+
return f.get();
230+
} catch (InterruptedException e) {
231+
Thread.currentThread().interrupt();
232+
throw new RuntimeException(e);
233+
} catch (ExecutionException e) {
234+
Throwable cause = e.getCause();
235+
if (cause instanceof RuntimeException) {
236+
throw (RuntimeException) cause;
237+
}
238+
throw new RuntimeException(cause);
239+
}
205240
}
206241

207242
@VisibleForTesting

0 commit comments

Comments
 (0)