Skip to content

Commit e1bec19

Browse files
committed
Remove retry handling for read sessions
The plan of automatically handle retries on read sessions didn't really pan out since we don't really control when data is transported over the network and errors are noticed. Instead we treat reads and writes in the same way, i.e. throwing a `SessionExpiredException` on all connection failures.
1 parent e47dcf6 commit e1bec19

File tree

14 files changed

+228
-154
lines changed

14 files changed

+228
-154
lines changed

driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.neo4j.driver.internal.security.SecurityPlan;
2727
import org.neo4j.driver.internal.spi.Connection;
2828
import org.neo4j.driver.internal.util.Consumer;
29-
import org.neo4j.driver.internal.util.Supplier;
3029
import org.neo4j.driver.v1.Logging;
3130
import org.neo4j.driver.v1.Record;
3231
import org.neo4j.driver.v1.Session;
@@ -180,6 +179,10 @@ private void callWithRetry(String procedureName, Consumer<Record> recorder )
180179
private synchronized void forget( BoltServerAddress address )
181180
{
182181
connections.purge( address );
182+
if ( endpoints.contains( address ) )
183+
{
184+
endpoints.clear();
185+
}
183186
}
184187

185188
@Override
@@ -191,32 +194,17 @@ public Session session()
191194
@Override
192195
public Session session( final SessionMode mode )
193196
{
194-
switch ( mode )
197+
return new ClusteredNetworkSession( acquireConnection( mode ), clusterSettings, new Consumer<BoltServerAddress>()
195198
{
196-
case READ:
197-
return new ReadNetworkSession( new Supplier<Connection>()
199+
@Override
200+
public void accept( BoltServerAddress address )
198201
{
199-
@Override
200-
public Connection get()
201-
{
202-
return acquireConnection( mode );
203-
}
204-
}, new Consumer<Connection>()
205-
{
206-
@Override
207-
public void accept( Connection connection )
208-
{
209-
forget( connection.address() );
210-
}
211-
}, clusterSettings, log );
212-
case WRITE:
213-
return new WriteNetworkSession( acquireConnection( mode ), clusterSettings, log );
214-
default:
215-
throw new UnsupportedOperationException();
216-
}
202+
forget( address );
203+
}
204+
}, log );
217205
}
218206

219-
private synchronized Connection acquireConnection( SessionMode mode )
207+
private Connection acquireConnection( SessionMode mode )
220208
{
221209
if (!discoverable)
222210
{
@@ -228,7 +216,24 @@ private synchronized Connection acquireConnection( SessionMode mode )
228216
{
229217
discover();
230218
}
219+
if ( !endpoints.valid() )
220+
{
221+
discoverEndpoints();
222+
}
231223

224+
switch ( mode )
225+
{
226+
case READ:
227+
return connections.acquire( endpoints.readServer );
228+
case WRITE:
229+
return connections.acquire( endpoints.writeServer );
230+
default:
231+
throw new ClientException( mode + " is not supported for creating new sessions" );
232+
}
233+
}
234+
235+
private synchronized void discoverEndpoints()
236+
{
232237
endpoints.clear();
233238
try
234239
{
@@ -255,7 +260,9 @@ else if ( serverMode.equals( "WRITE" ) )
255260
{
256261
log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS );
257262
discoverable = false;
258-
return connections.acquire();
263+
Connection connection = connections.acquire();
264+
endpoints.readServer = connection.address();
265+
endpoints.writeServer = connection.address();
259266
}
260267
throw e;
261268
}
@@ -264,17 +271,6 @@ else if ( serverMode.equals( "WRITE" ) )
264271
{
265272
throw new ServiceUnavailableException("Could not establish any endpoints for the call");
266273
}
267-
268-
269-
switch ( mode )
270-
{
271-
case READ:
272-
return connections.acquire( endpoints.readServer );
273-
case WRITE:
274-
return connections.acquire( endpoints.writeServer );
275-
default:
276-
throw new ClientException( mode + " is not supported for creating new sessions" );
277-
}
278274
}
279275

280276
@Override
@@ -292,8 +288,8 @@ public void close()
292288

293289
private static class Endpoints
294290
{
295-
BoltServerAddress readServer;
296-
BoltServerAddress writeServer;
291+
private BoltServerAddress readServer;
292+
private BoltServerAddress writeServer;
297293

298294
public boolean valid()
299295
{
@@ -305,6 +301,22 @@ public void clear()
305301
readServer = null;
306302
writeServer = null;
307303
}
304+
305+
boolean contains(BoltServerAddress address)
306+
{
307+
if (readServer != null && writeServer != null)
308+
{
309+
return readServer.equals( address ) || writeServer.equals( address );
310+
}
311+
else if ( readServer != null )
312+
{
313+
return readServer.equals( address );
314+
}
315+
else
316+
{
317+
return writeServer != null && writeServer.equals( address );
318+
}
319+
}
308320
}
309321

310322
}

driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,16 @@
2323

2424
public class ClusterSettings
2525
{
26-
private final int readRetry;
2726
private final int minimumNumberOfServers;
2827

29-
public ClusterSettings( int readRetry, int minimumNumberOfServers )
28+
public ClusterSettings( int minimumNumberOfServers )
3029
{
31-
this.readRetry = readRetry;
3230
this.minimumNumberOfServers = minimumNumberOfServers;
3331
}
3432

3533
public static ClusterSettings fromConfig( Config config )
3634
{
37-
return new ClusterSettings( config.maximumReadRetriesForCluster(), config.minimumKnownClusterSize() ) ;
38-
}
39-
40-
public int readRetry()
41-
{
42-
return readRetry;
35+
return new ClusterSettings( config.minimumKnownClusterSize() ) ;
4336
}
4437

4538
public int minimumNumberOfServers()

driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java renamed to driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,35 @@
1919
package org.neo4j.driver.internal;
2020

2121

22+
import org.neo4j.driver.internal.net.BoltServerAddress;
2223
import org.neo4j.driver.internal.spi.Connection;
24+
import org.neo4j.driver.internal.util.Consumer;
2325
import org.neo4j.driver.v1.Logger;
2426
import org.neo4j.driver.v1.Statement;
2527
import org.neo4j.driver.v1.StatementResult;
2628
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
2729
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
2830

29-
public class WriteNetworkSession extends NetworkSession
31+
public class ClusteredNetworkSession extends NetworkSession
3032
{
33+
private final Consumer<BoltServerAddress> onFailedConnection;
3134

32-
WriteNetworkSession(Connection connection, ClusterSettings clusterSettings, Logger logger )
35+
ClusteredNetworkSession( Connection connection, ClusterSettings clusterSettings, Consumer<BoltServerAddress> onFailedConnection, Logger logger )
3336
{
34-
super(connection, logger);
37+
super( connection, logger );
38+
this.onFailedConnection = onFailedConnection;
3539
}
3640

3741
@Override
3842
public StatementResult run( Statement statement )
3943
{
4044
try
4145
{
42-
return super.run( statement );
46+
return new ClusteredStatementResult( super.run( statement ), connection.address(), onFailedConnection );
4347
}//TODO we need to catch exceptions due to leader switches etc here
4448
catch ( ConnectionFailureException e )
4549
{
50+
onFailedConnection.accept( connection.address() );
4651
throw new SessionExpiredException( "Failed to perform write load to server", e );
4752
}
4853

driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/v1/Config.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import org.neo4j.driver.internal.logging.JULogging;
2525
import org.neo4j.driver.internal.net.pooling.PoolSettings;
26-
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
2726
import org.neo4j.driver.v1.util.Immutable;
2827

2928
import static java.lang.System.getProperty;
@@ -64,7 +63,6 @@ public class Config
6463
private final TrustStrategy trustStrategy;
6564

6665
private final int minServersInCluster;
67-
private final int readRetries;
6866

6967
private Config( ConfigBuilder builder)
7068
{
@@ -77,7 +75,6 @@ private Config( ConfigBuilder builder)
7775
this.encryptionLevel = builder.encryptionLevel;
7876
this.trustStrategy = builder.trustStrategy;
7977
this.minServersInCluster = builder.minServersInCluster;
80-
this.readRetries = builder.readRetries;
8178
}
8279

8380
/**
@@ -134,14 +131,6 @@ public TrustStrategy trustStrategy()
134131
return trustStrategy;
135132
}
136133

137-
/**
138-
* @return the number of retries to be attempted for read sessions
139-
*/
140-
public int maximumReadRetriesForCluster()
141-
{
142-
return readRetries;
143-
}
144-
145134
/**
146135
* @return the minimum number of servers the driver should know about.
147136
*/
@@ -180,7 +169,6 @@ public static class ConfigBuilder
180169
private TrustStrategy trustStrategy = trustOnFirstUse(
181170
new File( getProperty( "user.home" ), ".neo4j" + File.separator + "known_hosts" ) );
182171
public int minServersInCluster = 3;
183-
public int readRetries = 3;
184172

185173
private ConfigBuilder() {}
186174

@@ -285,21 +273,6 @@ public ConfigBuilder withTrustStrategy( TrustStrategy trustStrategy )
285273
return this;
286274
}
287275

288-
/**
289-
* For read queries the driver can do automatic retries upon server failures,
290-
*
291-
* This setting specifies how many retries that should be attempted before giving up
292-
* and throw a {@link ConnectionFailureException}. If not specified this setting defaults to 3 retries before
293-
* giving up.
294-
* @param retries The number or retries to attempt before giving up.
295-
* @return this builder
296-
*/
297-
public ConfigBuilder withMaximumReadRetriesForCluster( int retries )
298-
{
299-
this.readRetries = retries;
300-
return this;
301-
}
302-
303276
/**
304277
* Specifies the minimum numbers in a cluster a driver should know about.
305278
* <p>

0 commit comments

Comments
 (0)