Skip to content

Commit 08e31c1

Browse files
committed
code review
fix the issue when the current connection is invalidated fix issue with custom domains during failover
1 parent f95e771 commit 08e31c1

File tree

9 files changed

+162
-48
lines changed

9 files changed

+162
-48
lines changed

wrapper/src/main/java/software/amazon/jdbc/dialect/DialectManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.jdbc.Driver;
3030
import software.amazon.jdbc.HostSpec;
3131
import software.amazon.jdbc.PluginService;
32+
import software.amazon.jdbc.PropertyDefinition;
3233
import software.amazon.jdbc.util.CacheMap;
3334
import software.amazon.jdbc.util.ConnectionUrlParser;
3435
import software.amazon.jdbc.util.Messages;
@@ -81,7 +82,11 @@ public class DialectManager implements DialectProvider {
8182
private Dialect dialect = null;
8283
private String dialectCode;
8384

84-
private PluginService pluginService;
85+
private final PluginService pluginService;
86+
87+
static {
88+
PropertyDefinition.registerPluginProperties(DialectManager.class);
89+
}
8590

8691
public DialectManager(PluginService pluginService) {
8792
this.pluginService = pluginService;

wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraConnectionTrackerPlugin.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.Map;
2626
import java.util.Properties;
2727
import java.util.Set;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicLong;
2830
import java.util.logging.Logger;
2931
import org.checkerframework.checker.nullness.qual.NonNull;
3032
import software.amazon.jdbc.HostRole;
@@ -43,17 +45,24 @@ public class AuroraConnectionTrackerPlugin extends AbstractConnectionPlugin impl
4345

4446
private static final Logger LOGGER = Logger.getLogger(AuroraConnectionTrackerPlugin.class.getName());
4547

48+
// Check topology changes 3 min after last failover
49+
private static final long TOPOLOGY_CHANGES_EXPECTED_TIME_MS = TimeUnit.MINUTES.toNanos(3);
50+
4651
static final String METHOD_ABORT = "Connection.abort";
4752
static final String METHOD_CLOSE = "Connection.close";
4853
private static final Set<String> subscribedMethods =
4954
Collections.unmodifiableSet(new HashSet<String>() {
5055
{
5156
addAll(SubscribedMethodHelper.NETWORK_BOUND_METHODS);
57+
add(METHOD_CLOSE);
58+
add(METHOD_ABORT);
5259
add("connect");
5360
add("notifyNodeListChanged");
5461
}
5562
});
5663

64+
private static final AtomicLong hostListRefreshThresholdTimeNano = new AtomicLong(0);
65+
5766
private final PluginService pluginService;
5867
private final RdsUtils rdsHelper;
5968
private final OpenedConnectionTracker tracker;
@@ -87,7 +96,7 @@ public Connection connect(final String driverProtocol, final HostSpec hostSpec,
8796

8897
if (conn != null) {
8998
final RdsUrlType type = this.rdsHelper.identifyRdsType(hostSpec.getHost());
90-
if (type.isRdsCluster()) {
99+
if (type.isRdsCluster() || type == RdsUrlType.OTHER) {
91100
hostSpec.resetAliases();
92101
this.pluginService.fillAliases(conn, hostSpec);
93102
}
@@ -106,23 +115,45 @@ public <T, E extends Exception> T execute(final Class<T> resultClass, final Clas
106115
this.rememberWriter();
107116

108117
try {
118+
if (!methodName.equals(METHOD_CLOSE) && !methodName.equals(METHOD_ABORT)) {
119+
long localRefreshHostListTillTimeNano = hostListRefreshThresholdTimeNano.get();
120+
boolean needRefreshHostLists = false;
121+
if (localRefreshHostListTillTimeNano > 0) {
122+
if (localRefreshHostListTillTimeNano < System.nanoTime()) {
123+
needRefreshHostLists = true;
124+
} else {
125+
hostListRefreshThresholdTimeNano.compareAndSet(localRefreshHostListTillTimeNano, 0);
126+
}
127+
}
128+
if (this.needUpdateCurrentWriter || needRefreshHostLists) {
129+
// Calling this method may effectively close/abort a current connection
130+
this.checkWriterChanged(needRefreshHostLists);
131+
}
132+
}
109133
final T result = jdbcMethodFunc.call();
110134
if ((methodName.equals(METHOD_CLOSE) || methodName.equals(METHOD_ABORT))) {
111-
tracker.invalidateCurrentConnection(currentHostSpec, this.pluginService.getCurrentConnection());
112-
} else if (this.needUpdateCurrentWriter) {
113-
this.checkWriterChanged();
135+
tracker.removeConnectionTracking(currentHostSpec, this.pluginService.getCurrentConnection());
114136
}
115137
return result;
116138

117139
} catch (final Exception e) {
118140
if (e instanceof FailoverSQLException) {
119-
this.checkWriterChanged();
141+
hostListRefreshThresholdTimeNano.set(System.nanoTime() + TOPOLOGY_CHANGES_EXPECTED_TIME_MS);
142+
// Calling this method may effectively close/abort a current connection
143+
this.checkWriterChanged(true);
120144
}
121145
throw e;
122146
}
123147
}
124148

125-
private void checkWriterChanged() {
149+
private void checkWriterChanged(boolean needRefreshHostLists) {
150+
if (needRefreshHostLists) {
151+
try {
152+
this.pluginService.refreshHostList();
153+
} catch (SQLException ex) {
154+
// do nothing
155+
}
156+
}
126157
final HostSpec hostSpecAfterFailover = this.getWriter(this.pluginService.getAllHosts());
127158

128159
if (this.currentWriter == null) {
@@ -135,6 +166,7 @@ private void checkWriterChanged() {
135166
tracker.logOpenedConnections();
136167
this.currentWriter = hostSpecAfterFailover;
137168
this.needUpdateCurrentWriter = false;
169+
hostListRefreshThresholdTimeNano.set(0);
138170
}
139171
}
140172

wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import java.sql.Connection;
2020
import java.sql.SQLException;
2121
import java.util.Collections;
22+
import java.util.HashMap;
2223
import java.util.HashSet;
24+
import java.util.Map;
2325
import java.util.Properties;
2426
import java.util.Set;
2527
import java.util.concurrent.TimeUnit;
@@ -67,16 +69,47 @@ public class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu
6769
"1000",
6870
"Time between each retry of opening a connection.");
6971

72+
public static final AwsWrapperProperty VERIFY_OPENED_CONNECTION_TYPE =
73+
new AwsWrapperProperty(
74+
"verifyOpenedConnectionType",
75+
null,
76+
"Force to verify an opened connection to be either a writer or a reader.");
77+
78+
private enum VerifyOpenedConnectionType {
79+
WRITER,
80+
READER;
81+
82+
private static final Map<String, VerifyOpenedConnectionType> nameToValue =
83+
new HashMap<String, VerifyOpenedConnectionType>() {
84+
{
85+
put("writer", WRITER);
86+
put("reader", READER);
87+
}
88+
};
89+
90+
public static VerifyOpenedConnectionType fromValue(String value) {
91+
if (value == null) {
92+
return null;
93+
}
94+
return nameToValue.get(value.toLowerCase());
95+
}
96+
}
97+
7098
private final PluginService pluginService;
7199
private HostListProviderService hostListProviderService;
72100
private final RdsUtils rdsUtils = new RdsUtils();
73101

102+
private VerifyOpenedConnectionType verifyOpenedConnectionType = null;
103+
104+
74105
static {
75106
PropertyDefinition.registerPluginProperties(AuroraInitialConnectionStrategyPlugin.class);
76107
}
77108

78109
public AuroraInitialConnectionStrategyPlugin(final PluginService pluginService, final Properties properties) {
79110
this.pluginService = pluginService;
111+
this.verifyOpenedConnectionType =
112+
VerifyOpenedConnectionType.fromValue(VERIFY_OPENED_CONNECTION_TYPE.getString(properties));
80113
}
81114

82115
@Override
@@ -110,12 +143,8 @@ public Connection connect(
110143

111144
final RdsUrlType type = this.rdsUtils.identifyRdsType(hostSpec.getHost());
112145

113-
if (!type.isRdsCluster()) {
114-
// It's not a cluster endpoint. Continue with a normal workflow.
115-
return connectFunc.call();
116-
}
117-
118-
if (type == RdsUrlType.RDS_WRITER_CLUSTER) {
146+
if (type == RdsUrlType.RDS_WRITER_CLUSTER
147+
|| isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.WRITER) {
119148
Connection writerCandidateConn = this.getVerifiedWriterConnection(props, isInitialConnection, connectFunc);
120149
if (writerCandidateConn == null) {
121150
// Can't get writer connection. Continue with a normal workflow.
@@ -124,7 +153,8 @@ public Connection connect(
124153
return writerCandidateConn;
125154
}
126155

127-
if (type == RdsUrlType.RDS_READER_CLUSTER) {
156+
if (type == RdsUrlType.RDS_READER_CLUSTER
157+
|| isInitialConnection && this.verifyOpenedConnectionType == VerifyOpenedConnectionType.READER) {
128158
Connection readerCandidateConn = this.getVerifiedReaderConnection(props, isInitialConnection, connectFunc);
129159
if (readerCandidateConn == null) {
130160
// Can't get a reader connection. Continue with a normal workflow.

wrapper/src/main/java/software/amazon/jdbc/plugin/OpenedConnectionTracker.java

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020
import java.sql.Connection;
2121
import java.sql.SQLException;
2222
import java.util.Arrays;
23+
import java.util.HashSet;
2324
import java.util.Map;
2425
import java.util.Objects;
25-
import java.util.Optional;
2626
import java.util.Queue;
2727
import java.util.Set;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ConcurrentLinkedQueue;
30+
import java.util.concurrent.Executor;
3031
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.logging.Logger;
@@ -35,6 +36,7 @@
3536
import software.amazon.jdbc.util.Messages;
3637
import software.amazon.jdbc.util.RdsUtils;
3738
import software.amazon.jdbc.util.StringUtils;
39+
import software.amazon.jdbc.util.SynchronousExecutor;
3840
import software.amazon.jdbc.util.telemetry.TelemetryContext;
3941
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
4042
import software.amazon.jdbc.util.telemetry.TelemetryTraceLevel;
@@ -50,16 +52,17 @@ public class OpenedConnectionTracker {
5052
invalidateThread.setDaemon(true);
5153
return invalidateThread;
5254
});
53-
private static final ExecutorService abortConnectionExecutorService =
54-
Executors.newCachedThreadPool(
55-
r -> {
56-
final Thread abortThread = new Thread(r);
57-
abortThread.setDaemon(true);
58-
return abortThread;
59-
});
55+
private static final Executor abortConnectionExecutor = new SynchronousExecutor();
6056

6157
private static final Logger LOGGER = Logger.getLogger(OpenedConnectionTracker.class.getName());
6258
private static final RdsUtils rdsUtils = new RdsUtils();
59+
60+
private static final Set<String> safeToCheckClosedClasses = new HashSet<>(Arrays.asList(
61+
"HikariProxyConnection",
62+
"org.postgresql.jdbc.PgConnection",
63+
"com.mysql.cj.jdbc.ConnectionImpl",
64+
"org.mariadb.jdbc.Connection"));
65+
6366
private final PluginService pluginService;
6467

6568
public OpenedConnectionTracker(final PluginService pluginService) {
@@ -72,6 +75,7 @@ public void populateOpenedConnectionQueue(final HostSpec hostSpec, final Connect
7275
// Check if the connection was established using an instance endpoint
7376
if (rdsUtils.isRdsInstance(hostSpec.getHost())) {
7477
trackConnection(hostSpec.getHostAndPort(), conn);
78+
logOpenedConnections();
7579
return;
7680
}
7781

@@ -80,14 +84,17 @@ public void populateOpenedConnectionQueue(final HostSpec hostSpec, final Connect
8084
.max(String::compareToIgnoreCase)
8185
.orElse(null);
8286

83-
if (instanceEndpoint == null) {
84-
LOGGER.finest(
85-
Messages.get("OpenedConnectionTracker.unableToPopulateOpenedConnectionQueue",
86-
new Object[] {hostSpec.getHost()}));
87+
if (instanceEndpoint != null) {
88+
trackConnection(instanceEndpoint, conn);
89+
logOpenedConnections();
8790
return;
8891
}
8992

90-
trackConnection(instanceEndpoint, conn);
93+
// It seems there's no RDS instance host found. It might be a custom domain name. Let's track by all aliases
94+
for (String alias : aliases) {
95+
trackConnection(alias, conn);
96+
}
97+
logOpenedConnections();
9198
}
9299

93100
/**
@@ -100,28 +107,27 @@ public void invalidateAllConnections(final HostSpec hostSpec) {
100107
invalidateAllConnections(hostSpec.getAliases().toArray(new String[] {}));
101108
}
102109

103-
public void invalidateAllConnections(final String... node) {
110+
public void invalidateAllConnections(final String... keys) {
104111
TelemetryFactory telemetryFactory = this.pluginService.getTelemetryFactory();
105112
TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext(
106113
TELEMETRY_INVALIDATE_CONNECTIONS, TelemetryTraceLevel.NESTED);
107114

108115
try {
109-
final Optional<String> instanceEndpoint = Arrays.stream(node)
110-
.filter(x -> rdsUtils.isRdsInstance(rdsUtils.removePort(x)))
111-
.findFirst();
112-
if (!instanceEndpoint.isPresent()) {
113-
return;
116+
for (String key : keys) {
117+
try {
118+
final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(key);
119+
logConnectionQueue(key, connectionQueue);
120+
invalidateConnections(connectionQueue);
121+
} catch (Exception ex) {
122+
// ignore and continue
123+
}
114124
}
115-
final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(instanceEndpoint.get());
116-
logConnectionQueue(instanceEndpoint.get(), connectionQueue);
117-
invalidateConnections(openedConnections.get(instanceEndpoint.get()));
118-
119125
} finally {
120126
telemetryContext.closeContext();
121127
}
122128
}
123129

124-
public void invalidateCurrentConnection(final HostSpec hostSpec, final Connection connection) {
130+
public void removeConnectionTracking(final HostSpec hostSpec, final Connection connection) {
125131
final String host = rdsUtils.isRdsInstance(hostSpec.getHost())
126132
? hostSpec.asAlias()
127133
: hostSpec.getAliases().stream()
@@ -134,8 +140,11 @@ public void invalidateCurrentConnection(final HostSpec hostSpec, final Connectio
134140
}
135141

136142
final Queue<WeakReference<Connection>> connectionQueue = openedConnections.get(host);
137-
logConnectionQueue(host, connectionQueue);
138-
connectionQueue.removeIf(connectionWeakReference -> Objects.equals(connectionWeakReference.get(), connection));
143+
if (connectionQueue != null) {
144+
logConnectionQueue(host, connectionQueue);
145+
connectionQueue.removeIf(connectionWeakReference -> connectionWeakReference != null
146+
&& Objects.equals(connectionWeakReference.get(), connection));
147+
}
139148
}
140149

141150
private void trackConnection(final String instanceEndpoint, final Connection connection) {
@@ -144,10 +153,12 @@ private void trackConnection(final String instanceEndpoint, final Connection con
144153
instanceEndpoint,
145154
(k) -> new ConcurrentLinkedQueue<>());
146155
connectionQueue.add(new WeakReference<>(connection));
147-
logOpenedConnections();
148156
}
149157

150158
private void invalidateConnections(final Queue<WeakReference<Connection>> connectionQueue) {
159+
if (connectionQueue == null || connectionQueue.isEmpty()) {
160+
return;
161+
}
151162
invalidateConnectionsExecutorService.submit(() -> {
152163
WeakReference<Connection> connReference;
153164
while ((connReference = connectionQueue.poll()) != null) {
@@ -157,7 +168,7 @@ private void invalidateConnections(final Queue<WeakReference<Connection>> connec
157168
}
158169

159170
try {
160-
conn.abort(abortConnectionExecutorService);
171+
conn.abort(abortConnectionExecutor);
161172
} catch (final SQLException e) {
162173
// swallow this exception, current connection should be useless anyway.
163174
}
@@ -204,7 +215,10 @@ public void pruneNullConnections() {
204215
if (conn == null) {
205216
return true;
206217
}
207-
if (conn.getClass().getSimpleName().equals("HikariProxyConnection")) {
218+
// The following classes do not check connection validity by calling a DB server
219+
// so it's safe to check whether connection is already closed.
220+
if (safeToCheckClosedClasses.contains(conn.getClass().getSimpleName())
221+
|| safeToCheckClosedClasses.contains(conn.getClass().getName())) {
208222
try {
209223
return conn.isClosed();
210224
} catch (SQLException ex) {

0 commit comments

Comments
 (0)