Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.driver.v1.net.ServerAddressResolver;

import static java.lang.String.format;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler
Expand Down Expand Up @@ -221,7 +222,8 @@ private void acquire( AccessMode mode, AddressSet addresses, CompletableFuture<C
{
if ( error instanceof ServiceUnavailableException )
{
log.error( "Failed to obtain a connection towards address " + address, error );
SessionExpiredException errorToLog = new SessionExpiredException( format( "Server at %s is no longer available", address ), error );
log.warn( "Failed to obtain a connection towards address " + address, errorToLog );
forget( address );
eventExecutorGroup.next().execute( () -> acquire( mode, addresses, result ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
Expand All @@ -61,7 +63,9 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.v1.Logging.none;
Expand Down Expand Up @@ -726,6 +730,40 @@ void shouldRetryWriteTransactionUntilSuccess() throws Exception
}
}

@Test
void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exception
{
// This test simulates a router in a cluster when a leader is removed.
// The router first returns a RT with a writer inside.
// However this writer is killed while the driver is running a tx with it.
// Then at the second time the router returns the same RT with the killed writer inside.
// At the third round, the router removes the the writer server from RT reply.
// Finally, the router returns a RT with a reachable writer.
StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 );
StubServer brokenWriter = StubServer.start( "dead_write_server.script", 9004 );
StubServer writer = StubServer.start( "write_server.script", 9008 );

Logger logger = mock( Logger.class );
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
Session session = driver.session() )
{
AtomicInteger invocations = new AtomicInteger();
List<Record> records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) );

assertEquals( 0, records.size() );
assertEquals( 2, invocations.get() );
}
finally
{
assertEquals( 0, router.exitStatus() );
assertEquals( 0, brokenWriter.exitStatus() );
assertEquals( 0, writer.exitStatus() );
}
verify( logger, times( 3 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
}

@Test
void shouldRetryReadTransactionUntilFailure() throws Exception
{
Expand Down Expand Up @@ -1159,19 +1197,24 @@ void useSessionAfterDriverIsClosed() throws Exception
}
}

private static Driver newDriverWithSleeplessClock( String uriString )
private static Driver newDriverWithSleeplessClock( String uriString, Config config )
{
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );
return newDriver( uriString, driverFactory );
return newDriver( uriString, driverFactory, config );
}

private static Driver newDriverWithSleeplessClock( String uriString )
{
return newDriverWithSleeplessClock( uriString, config );
}

private static Driver newDriverWithFixedRetries( String uriString, int retries )
{
DriverFactory driverFactory = new DriverFactoryWithFixedRetryLogic( retries );
return newDriver( uriString, driverFactory );
return newDriver( uriString, driverFactory, config );
}

private static Driver newDriver( String uriString, DriverFactory driverFactory )
private static Driver newDriver( String uriString, DriverFactory driverFactory, Config config )
{
URI uri = URI.create( uriString );
RoutingSettings routingConf = new RoutingSettings( 1, 1, null );
Expand Down Expand Up @@ -1201,4 +1244,11 @@ private static List<String> readStrings( final String query, Session session )
return names;
} );
}

private static Logging mockedLogging( Logger logger )
{
Logging logging = mock( Logging.class );
when( logging.getLog( any() ) ).thenReturn( logger );
return logging;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
!: BOLT 3
!: AUTO RESET

C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": [],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}