diff --git a/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java b/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java index 7db7ab24c0..b2d82c8b3f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java @@ -19,11 +19,7 @@ package org.neo4j.driver.internal; -import java.util.Set; - -import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.security.SecurityPlan; -import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; @@ -33,12 +29,9 @@ abstract class BaseDriver implements Driver { private final SecurityPlan securityPlan; protected final Logger log; - protected final ConnectionPool connections; - BaseDriver( ConnectionPool connections, BoltServerAddress address, SecurityPlan securityPlan, Logging logging ) + BaseDriver( SecurityPlan securityPlan, Logging logging ) { - this.connections = connections; - this.connections.add( address ); this.securityPlan = securityPlan; this.log = logging.getLog( Session.LOG_NAME ); } @@ -49,9 +42,4 @@ public boolean isEncrypted() return securityPlan.requiresEncryption(); } - //Used for testing - Set servers() - { - return connections.addresses(); - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java b/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java index bc2d922fee..c8945544e6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java @@ -18,63 +18,108 @@ */ package org.neo4j.driver.internal; -import java.util.List; +import java.util.Collections; +import java.util.Comparator; +import java.util.Set; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.net.pooling.PoolSettings; -import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet; import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.SessionMode; import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.util.BiFunction; import static java.lang.String.format; public class ClusterDriver extends BaseDriver { - private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverEndpointAcquisitionServers"; - private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints"; + private static final String GET_SERVERS = "dbms.cluster.routing.getServers"; + private final static Comparator COMPARATOR = new Comparator() + { + @Override + public int compare( BoltServerAddress o1, BoltServerAddress o2 ) + { + int compare = o1.host().compareTo( o2.host() ); + if (compare == 0) + { + compare = Integer.compare( o1.port(), o2.port() ); + } + + return compare; + } + }; + private static final int MIN_SERVERS = 2; + private final ConnectionPool connections; + private final BiFunction sessionProvider; - private final Endpoints endpoints = new Endpoints(); - private final ClusterSettings clusterSettings; - private boolean discoverable = true; + private final ConcurrentRoundRobinSet routingServers = new ConcurrentRoundRobinSet<>(COMPARATOR); + private final ConcurrentRoundRobinSet readServers = new ConcurrentRoundRobinSet<>(COMPARATOR); + private final ConcurrentRoundRobinSet writeServers = new ConcurrentRoundRobinSet<>(COMPARATOR); - public ClusterDriver( BoltServerAddress seedAddress, ConnectionSettings connectionSettings, - ClusterSettings clusterSettings, + public ClusterDriver( BoltServerAddress seedAddress, + ConnectionPool connections, SecurityPlan securityPlan, - PoolSettings poolSettings, Logging logging ) + BiFunction sessionProvider, + Logging logging ) { - super( new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ),seedAddress, securityPlan, logging ); - this.clusterSettings = clusterSettings; - discover(); + super( securityPlan, logging ); + routingServers.add( seedAddress ); + this.connections = connections; + this.sessionProvider = sessionProvider; + checkServers(); } - synchronized void discover() + private void checkServers() { - if (!discoverable) + synchronized ( routingServers ) { - return; + if ( routingServers.size() < MIN_SERVERS || + readServers.isEmpty() || + writeServers.isEmpty()) + { + getServers(); + } } + } + //must be called from a synchronized block + private void getServers() + { + BoltServerAddress address = null; try { boolean success = false; - while ( !connections.isEmpty() && !success ) + while ( !routingServers.isEmpty() && !success ) { - success = call( DISCOVER_MEMBERS, new Consumer() + address = routingServers.hop(); + success = call( address, GET_SERVERS, new Consumer() { @Override public void accept( Record record ) { - connections.add(new BoltServerAddress( record.get( "address" ).asString() )); + BoltServerAddress newAddress = new BoltServerAddress( record.get( "address" ).asString() ); + switch ( record.get( "mode" ).asString().toUpperCase() ) + { + case "READ": + readServers.add( newAddress ); + break; + case "WRITE": + writeServers.add( newAddress ); + break; + case "ROUTE": + routingServers.add( newAddress ); + break; + } } } ); } @@ -89,8 +134,10 @@ public void accept( Record record ) { //no procedure there, not much to do, stick with what we've got //this may happen because server is running in standalone mode - log.warn( "Could not find procedure %s", DISCOVER_MEMBERS ); - discoverable = false; + this.close(); + throw new ServiceUnavailableException( + String.format( "Server %s couldn't perform discovery", + address == null ? "`UNKNOWN`" : address.toString()), ex ); } else { @@ -100,13 +147,14 @@ public void accept( Record record ) } //must be called from a synchronized method - private boolean call( String procedureName, Consumer recorder ) + private boolean call( BoltServerAddress address, String procedureName, Consumer recorder ) { Connection acquire = null; Session session = null; - try { - acquire = connections.acquire(); - session = new NetworkSession( acquire, log ); + try + { + acquire = connections.acquire(address); + session = sessionProvider.apply( acquire, log ); StatementResult records = session.run( format( "CALL %s", procedureName ) ); while ( records.hasNext() ) @@ -116,162 +164,70 @@ private boolean call( String procedureName, Consumer recorder ) } catch ( ConnectionFailureException e ) { - if (acquire != null) - { - forget( acquire.address() ); - } + forget( address ); return false; } finally { - if (acquire != null) - { - acquire.close(); - } - if (session != null) + if ( session != null ) { session.close(); } - } - return true; - } - - //must be called from a synchronized method - private void callWithRetry(String procedureName, Consumer recorder ) - { - while ( !connections.isEmpty() ) - { - Connection acquire = null; - Session session = null; - try { - acquire = connections.acquire(); - session = new NetworkSession( acquire, log ); - List list = session.run( format( "CALL %s", procedureName ) ).list(); - for ( Record record : list ) - { - recorder.accept( record ); - } - //we found results give up - return; - } - catch ( ConnectionFailureException e ) + if ( acquire != null ) { - if (acquire != null) - { - forget( acquire.address() ); - } - } - finally - { - if (acquire != null) - { - acquire.close(); - } - if (session != null) - { - session.close(); - } + acquire.close(); } - } - throw new ServiceUnavailableException( "Failed to communicate with any of the cluster members" ); + } + return true; } private synchronized void forget( BoltServerAddress address ) { connections.purge( address ); + routingServers.remove( address ); + readServers.remove( address ); + writeServers.remove( address ); } @Override public Session session() { - return session( SessionMode.WRITE ); + return session( AccessMode.WRITE ); } @Override - public Session session( final SessionMode mode ) + public Session session( final AccessMode mode ) { - switch ( mode ) - { - case READ: - return new ReadNetworkSession( new Supplier() - { - @Override - public Connection get() - { - return acquireConnection( mode ); - } - }, new Consumer() - { - @Override - public void accept( Connection connection ) + return new ClusteredNetworkSession( acquireConnection( mode ), + new ClusteredErrorHandler() { - forget( connection.address() ); - } - }, clusterSettings, log ); - case WRITE: - return new WriteNetworkSession( acquireConnection( mode ), clusterSettings, log ); - default: - throw new UnsupportedOperationException(); - } - } - - private synchronized Connection acquireConnection( SessionMode mode ) - { - if (!discoverable) - { - return connections.acquire(); - } - - //if we are short on servers, find new ones - if ( connections.addressCount() < clusterSettings.minimumNumberOfServers() ) - { - discover(); - } - - endpoints.clear(); - try - { - callWithRetry( ACQUIRE_ENDPOINTS, new Consumer() - { - @Override - public void accept( Record record ) - { - String serverMode = record.get( "role" ).asString(); - if ( serverMode.equals( "READ" ) ) + @Override + public void onConnectionFailure( BoltServerAddress address ) { - endpoints.readServer = new BoltServerAddress( record.get( "address" ).asString() ); + forget( address ); } - else if ( serverMode.equals( "WRITE" ) ) + + @Override + public void onWriteFailure( BoltServerAddress address ) { - endpoints.writeServer = new BoltServerAddress( record.get( "address" ).asString() ); + writeServers.remove( address ); } - } - } ); - } - catch (ClientException e) - { - if ( e.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) ) - { - log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS ); - discoverable = false; - return connections.acquire(); - } - throw e; - } - - if ( !endpoints.valid() ) - { - throw new ServiceUnavailableException("Could not establish any endpoints for the call"); - } + }, + log ); + } + private Connection acquireConnection( AccessMode mode ) + { + //Potentially rediscover servers if we are not happy with our current knowledge + checkServers(); switch ( mode ) { case READ: - return connections.acquire( endpoints.readServer ); + return connections.acquire( readServers.hop() ); case WRITE: - return connections.acquire( endpoints.writeServer ); + return connections.acquire( writeServers.hop() ); default: throw new ClientException( mode + " is not supported for creating new sessions" ); } @@ -290,21 +246,28 @@ public void close() } } - private static class Endpoints + //For testing + Set routingServers() + { + return Collections.unmodifiableSet( routingServers ); + } + + //For testing + Set readServers() { - BoltServerAddress readServer; - BoltServerAddress writeServer; + return Collections.unmodifiableSet(readServers); + } - public boolean valid() - { - return readServer != null && writeServer != null; - } + //For testing + Set writeServers() + { + return Collections.unmodifiableSet( writeServers); + } - public void clear() - { - readServer = null; - writeServer = null; - } + //For testing + ConnectionPool connectionPool() + { + return connections; } } \ No newline at end of file diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java b/driver/src/main/java/org/neo4j/driver/internal/ClusteredErrorHandler.java similarity index 52% rename from driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java rename to driver/src/main/java/org/neo4j/driver/internal/ClusteredErrorHandler.java index d3c24254c0..09ae15ccee 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusteredErrorHandler.java @@ -16,34 +16,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.neo4j.driver.internal; -import org.neo4j.driver.v1.Config; +import org.neo4j.driver.internal.net.BoltServerAddress; -public class ClusterSettings +/** + * Interface used for tracking errors when connected to a cluster. + */ +interface ClusteredErrorHandler { - private final int readRetry; - private final int minimumNumberOfServers; - - public ClusterSettings( int readRetry, int minimumNumberOfServers ) - { - this.readRetry = readRetry; - this.minimumNumberOfServers = minimumNumberOfServers; - } - - public static ClusterSettings fromConfig( Config config ) - { - return new ClusterSettings( config.maximumReadRetriesForCluster(), config.minimumKnownClusterSize() ) ; - } - - public int readRetry() - { - return readRetry; - } + void onConnectionFailure( BoltServerAddress address ); - public int minimumNumberOfServers() - { - return minimumNumberOfServers; - } + void onWriteFailure( BoltServerAddress address ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java new file mode 100644 index 0000000000..2c3d21f504 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; + +public class ClusteredNetworkSession extends NetworkSession +{ + private final ClusteredErrorHandler onError; + + ClusteredNetworkSession( Connection connection, + ClusteredErrorHandler onError, Logger logger ) + { + super( connection, logger ); + this.onError = onError; + } + + @Override + public StatementResult run( Statement statement ) + { + try + { + return new ClusteredStatementResult( super.run( statement ), connection.address(), onError ); + } + catch ( ConnectionFailureException e ) + { + onError.onConnectionFailure( connection.address() ); + throw new SessionExpiredException( "Failed to perform write load to server", e ); + } + catch ( ClientException e ) + { + if ( e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" ) ) + { + onError.onWriteFailure( connection.address() ); + throw new SessionExpiredException( + String.format( "Server at %s no longer accepts writes", connection.address().toString() ) ); + } + else + { + throw e; + } + } + } + + @Override + public void close() + { + try + { + super.close(); + } + catch ( ConnectionFailureException e ) + { + BoltServerAddress address = connection.address(); + onError.onConnectionFailure( address ); + throw new SessionExpiredException( + String.format( "Server at %s is no longer available", address.toString() ), e ); + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java new file mode 100644 index 0000000000..af80164d70 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java @@ -0,0 +1,261 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.util.List; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.NoSuchRecordException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.util.Function; + +public class ClusteredStatementResult implements StatementResult +{ + private final StatementResult delegate; + private final BoltServerAddress address; + private final ClusteredErrorHandler onError; + + ClusteredStatementResult( StatementResult delegate, BoltServerAddress address, ClusteredErrorHandler onError ) + { + this.delegate = delegate; + this.address = address; + this.onError = onError; + } + + @Override + public List keys() + { + try + { + return delegate.keys(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + @Override + public boolean hasNext() + { + try + { + return delegate.hasNext(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + @Override + public Record next() + { + try + { + return delegate.next(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + + @Override + public Record single() throws NoSuchRecordException + { + try + { + return delegate.single(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + @Override + public Record peek() + { + try + { + return delegate.peek(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + @Override + public List list() + { + try + { + return delegate.list(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + @Override + public List list( Function mapFunction ) + { + try + { + return delegate.list(mapFunction); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + @Override + public void remove() + { + throw new ClientException( "Removing records from a result is not supported." ); + } + + @Override + public ResultSummary consume() + { + try + { + return delegate.consume(); + } + catch ( ConnectionFailureException e ) + { + throw sessionExpired( e ); + } + catch ( ClientException e ) + { + if ( isFailedToWrite( e ) ) + { + throw failedWrite(); + } + else + { + throw e; + } + } + } + + private SessionExpiredException sessionExpired( ConnectionFailureException e ) + { + onError.onConnectionFailure( address ); + return new SessionExpiredException( String.format( "Server at %s is no longer available", address.toString()), e); + } + + private SessionExpiredException failedWrite() + { + onError.onWriteFailure( address ); + return new SessionExpiredException( String.format( "Server at %s no longer accepts writes", address.toString())); + } + + private boolean isFailedToWrite( ClientException e ) + { + return e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java b/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java index 8f38a3cf05..6b1aa35393 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java @@ -16,35 +16,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.neo4j.driver.internal; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.net.pooling.PoolSettings; -import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.SessionMode; import static java.lang.String.format; public class DirectDriver extends BaseDriver { - public DirectDriver( BoltServerAddress address, ConnectionSettings connectionSettings, SecurityPlan securityPlan, - PoolSettings poolSettings, Logging logging ) + protected final ConnectionPool connections; + private final BoltServerAddress address; + + public DirectDriver( BoltServerAddress address, ConnectionPool connections, SecurityPlan securityPlan, + Logging logging ) { - super( new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ), address, securityPlan, logging ); + super( securityPlan, logging ); + this.connections = connections; + this.address = address; } @Override public Session session() { - return new NetworkSession( connections.acquire(), log ); + return new NetworkSession( connections.acquire( address ), log ); } @Override - public Session session( SessionMode ignore ) + public Session session( AccessMode ignore ) { return session(); } @@ -61,4 +64,9 @@ public void close() log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); } } + + BoltServerAddress server() + { + return address; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 927091ad6d..9f89c93894 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -60,7 +60,7 @@ public void run() private ExplicitTransaction currentTransaction; private AtomicBoolean isOpen = new AtomicBoolean( true ); - NetworkSession( Connection connection, Logger logger ) + public NetworkSession( Connection connection, Logger logger ) { this.connection = connection; this.logger = logger; diff --git a/driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java deleted file mode 100644 index 65a353457f..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - - -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.internal.util.Supplier; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Statement; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.exceptions.ConnectionFailureException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; - -public class ReadNetworkSession extends NetworkSession -{ - private final Supplier connectionSupplier; - private final Consumer failed; - private final ClusterSettings clusterSettings; - - ReadNetworkSession(Supplier connectionSupplier, Consumer failed, - ClusterSettings clusterSettings, Logger logger ) - { - super(connectionSupplier.get(), logger); - this.connectionSupplier = connectionSupplier; - this.clusterSettings = clusterSettings; - this.failed = failed; - } - - @Override - public StatementResult run( Statement statement ) - { - for ( int i = 0; i < clusterSettings.readRetry(); i++ ) - { - try - { - return super.run( statement ); - } - catch ( ConnectionFailureException e ) - { - failed.accept(connection); - connection = connectionSupplier.get(); - } - } - - throw new ServiceUnavailableException( "Not able to connect to any members of the cluster" ); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java deleted file mode 100644 index a7a5a63487..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - - -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Statement; -import org.neo4j.driver.v1.StatementResult; -import org.neo4j.driver.v1.exceptions.ConnectionFailureException; -import org.neo4j.driver.v1.exceptions.SessionExpiredException; - -public class WriteNetworkSession extends NetworkSession -{ - - WriteNetworkSession(Connection connection, ClusterSettings clusterSettings, Logger logger ) - { - super(connection, logger); - } - - @Override - public StatementResult run( Statement statement ) - { - try - { - return super.run( statement ); - }//TODO we need to catch exceptions due to leader switches etc here - catch ( ConnectionFailureException e ) - { - throw new SessionExpiredException( "Failed to perform write load to server", e ); - } - - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index a1e18a7eba..722f4e747d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -18,11 +18,9 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.Comparator; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,26 +53,10 @@ */ public class SocketConnectionPool implements ConnectionPool { - /** * Pools, organized by server address. */ - private final ConcurrentSkipListMap> pools = new ConcurrentSkipListMap<>( - - new Comparator() - { - @Override - public int compare( BoltServerAddress o1, BoltServerAddress o2 ) - { - int compare = o1.host().compareTo( o2.host() ); - if (compare == 0) - { - compare = Integer.compare( o1.port(), o2.port() ); - } - - return compare; - } - } ); + private final ConcurrentHashMap> pools = new ConcurrentHashMap<>(); private final Clock clock = Clock.SYSTEM; @@ -83,8 +65,6 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 ) private final PoolSettings poolSettings; private final Logging logging; - private BoltServerAddress current = null; - /** Shutdown flag */ private final AtomicBoolean stopped = new AtomicBoolean( false ); @@ -139,61 +119,13 @@ public Connection acquire( BoltServerAddress address ) return conn; } - @Override - public Connection acquire() - { - if ( current == null ) - { - current = pools.firstKey(); - } - else - { - current = pools.higherKey( current ); - //We've gone through all connections, start over - if (current == null) - { - current = pools.firstKey(); - } - } - - if ( current == null ) - { - throw new IllegalStateException( "Cannot acquire connection from an empty pool" ); - } - - return acquire( current ); - } - - @Override - public boolean isEmpty() - { - return pools.isEmpty(); - } - - @Override - public int addressCount() - { - return pools.size(); - } - - @Override - public void add( BoltServerAddress address ) - { - pools.putIfAbsent( address, new LinkedBlockingQueue( ) ); - } - - @Override - public Set addresses() - { - return pools.keySet(); - } - private BlockingQueue pool( BoltServerAddress address ) { BlockingQueue pool = pools.get( address ); if ( pool == null ) { pool = new LinkedBlockingQueue<>(poolSettings.maxIdleConnectionPoolSize()); + if ( pools.putIfAbsent( address, pool ) != null ) { // We lost a race to create the pool, dispose of the one we created, and recurse @@ -221,6 +153,12 @@ public void purge( BoltServerAddress address ) } } + @Override + public boolean hasAddress( BoltServerAddress address ) + { + return pools.containsKey( address ); + } + @Override public void close() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java index 4172ed1318..9885ff7028 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java @@ -19,8 +19,6 @@ package org.neo4j.driver.internal.spi; -import java.util.Set; - import org.neo4j.driver.internal.net.BoltServerAddress; public interface ConnectionPool extends AutoCloseable @@ -33,40 +31,11 @@ public interface ConnectionPool extends AutoCloseable */ Connection acquire( BoltServerAddress address ); - /** - * Acquire a connection to one of the addresses that are currently in the pool - * @return A connection to one of the addresses in the pool - * @throws IllegalStateException if the pool is empty - */ - Connection acquire(); - /** * Removes all connections to a given address from the pool. * @param address The address to remove. */ void purge( BoltServerAddress address ); - /** - * Checks if the connection pool is empty - * @return true if the pool is empty, otherwise false - */ - boolean isEmpty(); - - /** - * Returns the number of addresses stored in the pool. - * @return the current number of addresses stored in the pool - */ - int addressCount(); - - /** - * Adds an address to the pool. - * @param address the address to add - */ - void add( BoltServerAddress address ); - - /** - * Returns all addresses known by the pool - * @return the addresses known by the pool - */ - Set addresses(); + boolean hasAddress( BoltServerAddress address ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java b/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java new file mode 100644 index 0000000000..666e00e524 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java @@ -0,0 +1,146 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * A set that exposes a method {@link #hop()} that cycles through the members of the set. + * @param the type of elements in the set + */ +public class ConcurrentRoundRobinSet implements Set +{ + private final ConcurrentSkipListSet set; + private T current; + + public ConcurrentRoundRobinSet() + { + set = new ConcurrentSkipListSet<>(); + } + + public ConcurrentRoundRobinSet( Comparator comparator ) + { + set = new ConcurrentSkipListSet<>( comparator ); + } + + public T hop() + { + if ( current == null ) + { + current = set.first(); + } + else + { + current = set.higher( current ); + //We've gone through all connections, start over + if ( current == null ) + { + current = set.first(); + } + } + + if ( current == null ) + { + throw new IllegalStateException( "nothing in the set" ); + } + + return current; + } + + @Override + public boolean add( T item ) + { + return set.add( item ); + } + + @Override + public boolean containsAll( Collection c ) + { + return set.containsAll( c ); + } + + @Override + public boolean addAll( Collection c ) + { + return set.addAll( c ); + } + + @Override + public boolean retainAll( Collection c ) + { + return set.retainAll( c ); + } + + @Override + public boolean removeAll( Collection c ) + { + return set.retainAll( c ); + } + + @Override + public void clear() + { + set.clear(); + } + + @Override + public boolean remove( Object o ) + { + return set.remove( o ); + } + + public int size() + { + return set.size(); + } + + public boolean isEmpty() + { + return set.isEmpty(); + } + + @Override + public boolean contains( Object o ) + { + return set.contains( o ); + } + + @Override + public Iterator iterator() + { + return set.iterator(); + } + + @Override + public Object[] toArray() + { + return set.toArray(); + } + + @SuppressWarnings( "SuspiciousToArrayCall" ) + @Override + public T1[] toArray( T1[] a ) + { + return set.toArray( a ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/SessionMode.java b/driver/src/main/java/org/neo4j/driver/v1/AccessMode.java similarity index 96% rename from driver/src/main/java/org/neo4j/driver/v1/SessionMode.java rename to driver/src/main/java/org/neo4j/driver/v1/AccessMode.java index 09f177235b..2edca16e4c 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/SessionMode.java +++ b/driver/src/main/java/org/neo4j/driver/v1/AccessMode.java @@ -18,7 +18,7 @@ */ package org.neo4j.driver.v1; -public enum SessionMode +public enum AccessMode { READ, WRITE diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index d8bb9a4313..41f743b1dc 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -23,7 +23,6 @@ import org.neo4j.driver.internal.logging.JULogging; import org.neo4j.driver.internal.net.pooling.PoolSettings; -import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.util.Immutable; import static java.lang.System.getProperty; @@ -49,9 +48,6 @@ public class Config /** User defined logging */ private final Logging logging; - /** The size of connection pool for each database url */ - private final int connectionPoolSize; - private final int maxIdleConnectionPoolSize; /** Connections that have been idle longer than this threshold will have a ping test performed on them. */ @@ -64,20 +60,17 @@ public class Config private final TrustStrategy trustStrategy; private final int minServersInCluster; - private final int readRetries; private Config( ConfigBuilder builder) { this.logging = builder.logging; - this.connectionPoolSize = builder.connectionPoolSize; this.maxIdleConnectionPoolSize = builder.maxIdleConnectionPoolSize; this.idleTimeBeforeConnectionTest = builder.idleTimeBeforeConnectionTest; this.encryptionLevel = builder.encryptionLevel; this.trustStrategy = builder.trustStrategy; this.minServersInCluster = builder.minServersInCluster; - this.readRetries = builder.readRetries; } /** @@ -134,22 +127,6 @@ public TrustStrategy trustStrategy() return trustStrategy; } - /** - * @return the number of retries to be attempted for read sessions - */ - public int maximumReadRetriesForCluster() - { - return readRetries; - } - - /** - * @return the minimum number of servers the driver should know about. - */ - public int minimumKnownClusterSize() - { - return minServersInCluster; - } - /** * Return a {@link ConfigBuilder} instance * @return a {@link ConfigBuilder} instance @@ -173,14 +150,12 @@ public static Config defaultConfig() public static class ConfigBuilder { private Logging logging = new JULogging( Level.INFO ); - private int connectionPoolSize = 50; private int maxIdleConnectionPoolSize = PoolSettings.DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE; private long idleTimeBeforeConnectionTest = PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST; private EncryptionLevel encryptionLevel = EncryptionLevel.REQUIRED_NON_LOCAL; private TrustStrategy trustStrategy = trustOnFirstUse( new File( getProperty( "user.home" ), ".neo4j" + File.separator + "known_hosts" ) ); public int minServersInCluster = 3; - public int readRetries = 3; private ConfigBuilder() {} @@ -210,7 +185,6 @@ public ConfigBuilder withLogging( Logging logging ) @Deprecated public ConfigBuilder withMaxSessions( int size ) { - this.connectionPoolSize = size; return this; } @@ -285,37 +259,6 @@ public ConfigBuilder withTrustStrategy( TrustStrategy trustStrategy ) return this; } - /** - * For read queries the driver can do automatic retries upon server failures, - * - * This setting specifies how many retries that should be attempted before giving up - * and throw a {@link ConnectionFailureException}. If not specified this setting defaults to 3 retries before - * giving up. - * @param retries The number or retries to attempt before giving up. - * @return this builder - */ - public ConfigBuilder withMaximumReadRetriesForCluster( int retries ) - { - this.readRetries = retries; - return this; - } - - /** - * Specifies the minimum numbers in a cluster a driver should know about. - *

- * Once the number of servers drops below this threshold, the driver will automatically trigger a discovery - * event - * asking the servers for more members. - * - * @param minNumberOfServers the minimum number of servers the driver should know about - * @return this builder - */ - public ConfigBuilder withMinimumKnownClusterSize( int minNumberOfServers ) - { - this.minServersInCluster = minNumberOfServers; - return this; - } - /** * Create a config instance from this builder. * @return a {@link Config} instance diff --git a/driver/src/main/java/org/neo4j/driver/v1/Driver.java b/driver/src/main/java/org/neo4j/driver/v1/Driver.java index 498112467c..54cda57ea2 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Driver.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Driver.java @@ -86,7 +86,7 @@ public interface Driver extends AutoCloseable */ Session session(); - Session session(SessionMode mode); + Session session(AccessMode mode); /** * Close all the resources assigned to this driver diff --git a/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java b/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java index e5eddcfab6..8362fd3d45 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java +++ b/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java @@ -23,13 +23,17 @@ import java.security.GeneralSecurityException; import org.neo4j.driver.internal.ClusterDriver; -import org.neo4j.driver.internal.ClusterSettings; import org.neo4j.driver.internal.ConnectionSettings; import org.neo4j.driver.internal.DirectDriver; +import org.neo4j.driver.internal.NetworkSession; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.net.pooling.PoolSettings; +import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.util.BiFunction; import static java.lang.String.format; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; @@ -43,6 +47,17 @@ */ public class GraphDatabase { + + private static final BiFunction + SESSION_PROVIDER = new BiFunction() + { + @Override + public Session apply( Connection connection, Logger logger ) + { + return new NetworkSession( connection, logger ); + } + }; + /** * Return a driver for a Neo4j instance with the default configuration settings * @@ -145,7 +160,7 @@ public static Driver driver( URI uri, AuthToken authToken, Config config ) new ConnectionSettings( authToken == null ? AuthTokens.none() : authToken ); // Make sure we have some configuration to play with - if (config == null) + if ( config == null ) { config = Config.defaultConfig(); } @@ -167,12 +182,14 @@ public static Driver driver( URI uri, AuthToken authToken, Config config ) config.idleTimeBeforeConnectionTest() ); // And finally, construct the driver proper + ConnectionPool connectionPool = + new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, config.logging() ); switch ( scheme.toLowerCase() ) { case "bolt": - return new DirectDriver( address, connectionSettings, securityPlan, poolSettings, config.logging() ); - case "bolt+discovery": - return new ClusterDriver( address, connectionSettings, ClusterSettings.fromConfig( config ), securityPlan, poolSettings, config.logging() ); + return new DirectDriver( address, connectionPool, securityPlan, config.logging() ); + case "bolt+routing": + return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, config.logging() ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } @@ -187,7 +204,7 @@ private static SecurityPlan createSecurityPlan( BoltServerAddress address, Confi { Config.EncryptionLevel encryptionLevel = config.encryptionLevel(); boolean requiresEncryption = encryptionLevel.equals( REQUIRED ) || - (encryptionLevel.equals( REQUIRED_NON_LOCAL ) && !address.isLocal() ); + (encryptionLevel.equals( REQUIRED_NON_LOCAL ) && !address.isLocal()); if ( requiresEncryption ) { @@ -195,16 +212,19 @@ private static SecurityPlan createSecurityPlan( BoltServerAddress address, Confi switch ( config.trustStrategy().strategy() ) { case TRUST_SIGNED_CERTIFICATES: - logger.warn( "Option `TRUST_SIGNED_CERTIFICATE` has been deprecated and will be removed in a future version " + - "of the driver. Please switch to use `TRUST_CUSTOM_CA_SIGNED_CERTIFICATES` instead." ); - //intentional fallthrough + logger.warn( + "Option `TRUST_SIGNED_CERTIFICATE` has been deprecated and will be removed in a future " + + "version " + + "of the driver. Please switch to use `TRUST_CUSTOM_CA_SIGNED_CERTIFICATES` instead." ); + //intentional fallthrough case TRUST_CUSTOM_CA_SIGNED_CERTIFICATES: return SecurityPlan.forSignedCertificates( config.trustStrategy().certFile() ); case TRUST_ON_FIRST_USE: return SecurityPlan.forTrustOnFirstUse( config.trustStrategy().certFile(), address, logger ); default: - throw new ClientException( "Unknown TLS authentication strategy: " + config.trustStrategy().strategy().name() ); + throw new ClientException( + "Unknown TLS authentication strategy: " + config.trustStrategy().strategy().name() ); } } else diff --git a/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java index 5172413942..f4d795d051 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java +++ b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java @@ -28,4 +28,9 @@ public ServiceUnavailableException( String message ) { super( message ); } + + public ServiceUnavailableException( String message, Throwable throwable ) + { + super( message, throwable); + } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java b/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java index 84ba94eca9..f4904806a1 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java +++ b/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java @@ -26,6 +26,11 @@ */ public class SessionExpiredException extends Neo4jException { + public SessionExpiredException( String message) + { + super( message ); + } + public SessionExpiredException( String message, Throwable throwable ) { super( message, throwable ); diff --git a/driver/src/main/java/org/neo4j/driver/v1/util/BiFunction.java b/driver/src/main/java/org/neo4j/driver/v1/util/BiFunction.java new file mode 100644 index 0000000000..b14e4938e8 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/v1/util/BiFunction.java @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.util; + +/** + * Same as {@link java.util.function.BiFunction}, but defined here to work in versions older than java 8. + * + * @param the type of the first argument to the function + * @param the type of the second argument to the function + * @param the type of the result of the function + * + */ +public interface BiFunction { + + /** + * Applies this function to the given arguments. + * + * @param t the first function argument + * @param u the second function argument + * @return the function result + */ + R apply(T t, U u); +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java new file mode 100644 index 0000000000..75a5e26b95 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java @@ -0,0 +1,480 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +import org.neo4j.driver.internal.logging.ConsoleLogging; +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.util.Function; +import org.neo4j.driver.v1.util.StubServer; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsNot.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ClusterDriverStubTest +{ + @Rule + public ExpectedException exception = ExpectedException.none(); + + private static final Config config = Config.build().withLogging( new ConsoleLogging( Level.INFO ) ).toConfig(); + + @Ignore + public void shouldDiscoverServers() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "discover_servers.script" ), 9001 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + + // When + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + // Then + Set addresses = driver.routingServers(); + assertThat( addresses, hasSize( 3 ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); + } + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldDiscoverNewServers() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "discover_new_servers.script" ), 9001 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + + // When + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + // Then + Set addresses = driver.routingServers(); + assertThat( addresses, hasSize( 4 ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ) ); + } + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldHandleEmptyResponse() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "handle_empty_response.script" ), 9001 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + Set servers = driver.routingServers(); + assertThat( servers, hasSize( 1 ) ); + assertThat( servers, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); + } + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldHandleAcquireReadSession() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a read server + StubServer readServer = StubServer.start( resource( "read_server.script" ), 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.READ ) ) + { + List result = session.run( "MATCH (n) RETURN n.name" ).list( new Function() + { + @Override + public String apply( Record record ) + { + return record.get( "n.name" ).asString(); + } + } ); + + assertThat( result, equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) ); + + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldRoundRobinReadServers() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START two read servers + StubServer readServer1 = StubServer.start( resource( "read_server.script" ), 9005 ); + StubServer readServer2 = StubServer.start( resource( "read_server.script" ), 9006 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + // Run twice, one on each read server + for ( int i = 0; i < 2; i++ ) + { + try ( Session session = driver.session( AccessMode.READ ) ) + { + assertThat( session.run( "MATCH (n) RETURN n.name" ).list( new Function() + { + @Override + public String apply( Record record ) + { + return record.get( "n.name" ).asString(); + } + } ), equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) ); + } + } + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer1.exitStatus(), equalTo( 0 ) ); + assertThat( readServer2.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldThrowSessionExpiredIfReadServerDisappears() + throws IOException, InterruptedException, StubServer.ForceKilled + { + //Expect + exception.expect( SessionExpiredException.class ); + exception.expectMessage( "Server at 127.0.0.1:9005 is no longer available" ); + + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a read server + StubServer.start( resource( "dead_server.script" ), 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.READ ) ) + { + session.run( "MATCH (n) RETURN n.name" ); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldThrowSessionExpiredIfWriteServerDisappears() + throws IOException, InterruptedException, StubServer.ForceKilled + { + //Expect + exception.expect( SessionExpiredException.class ); + //exception.expectMessage( "Server at 127.0.0.1:9006 is no longer available" ); + + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a dead write servers + StubServer.start( resource( "dead_server.script" ), 9007 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.WRITE ) ) + { + session.run( "MATCH (n) RETURN n.name" ).consume(); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldHandleAcquireWriteSession() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a write server + StubServer writeServer = StubServer.start( resource( "write_server.script" ), 9007 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.WRITE ) ) + { + session.run( "CREATE (n {name:'Bob'})" ); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldRoundRobinWriteSessions() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a write server + StubServer writeServer1 = StubServer.start( resource( "write_server.script" ), 9007 ); + StubServer writeServer2 = StubServer.start( resource( "write_server.script" ), 9008 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + for ( int i = 0; i < 2; i++ ) + { + try(Session session = driver.session() ) + { + session.run( "CREATE (n {name:'Bob'})" ); + } + } + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer1.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer2.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldRememberEndpoints() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a read server + StubServer readServer = StubServer.start( resource( "read_server.script" ), 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + Session session = driver.session( AccessMode.READ ) ) + { + session.run( "MATCH (n) RETURN n.name" ).consume(); + + assertThat( driver.readServers(), hasSize( 2 )); + assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) ); + assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ) ); + assertThat( driver.writeServers(), hasSize( 2 )); + assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) ); + assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) ); + //Make sure we don't cache acquired servers as discovery servers + assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 )))); + assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9006 )))); + } + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a read server + StubServer.start( resource( "dead_server.script" ), 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + boolean failed = false; + try + { + Session session = driver.session( AccessMode.READ ); + session.run( "MATCH (n) RETURN n.name" ).consume(); + session.close(); + } + catch ( SessionExpiredException e ) + { + failed = true; + } + + assertTrue( failed ); + assertThat( driver.readServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) )); + assertThat( driver.writeServers(), hasSize( 2 ) ); + driver.close(); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldRediscoverIfNecessaryOnSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "rediscover.script" ), 9001 ); + + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + //START a read server + StubServer.start( resource( "read_server.script" ), 9005 ); + + //On creation we only find ourselves + ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + assertThat( driver.routingServers(), hasSize( 1 ) ); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + + //since we know about less than three servers a rediscover should be triggered + Session session = driver.session( AccessMode.READ ); + assertThat( driver.routingServers(), hasSize( 4 ) ); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) )); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) )); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) )); + + session.close(); + driver.close(); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldOnlyGetServersOnce() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "rediscover.script" ), 9001 ); + + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + //START a read server + StubServer.start( resource( "read_server.script" ), 9005 ); + + //On creation we only find ourselves + final ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + assertThat( driver.routingServers(), hasSize( 1 ) ); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + + ExecutorService runner = Executors.newFixedThreadPool( 10 ); + for ( int i = 0; i < 10; i++ ) + { + runner.submit( new Runnable() + { + @Override + public void run() + { + //noinspection EmptyTryBlock + try(Session ignore = driver.session( AccessMode.READ )) + { + //empty + } + + } + } ); + } + runner.awaitTermination( 10, TimeUnit.SECONDS ); + //since we know about less than three servers a rediscover should be triggered + assertThat( driver.routingServers(), hasSize( 4 ) ); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) )); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) )); + assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) )); + + driver.close(); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldFailOnNonDiscoverableServer() throws IOException, InterruptedException, StubServer.ForceKilled + { + // When + StubServer server = StubServer.start( resource( "non_discovery_server.script" ), 9001 ); + + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + boolean failed = false; + //noinspection EmptyTryBlock + try + { + GraphDatabase.driver( uri, config ); + } + catch ( ServiceUnavailableException e ) + { + failed = true; + } + assertTrue( failed ); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldHandleLeaderSwitchWhenWriting() + throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //START a write server that doesn't accept writes + StubServer.start( resource( "not_able_to_write_server.script" ), 9007 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + boolean failed = false; + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) )); + assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) )); + session.run( "CREATE ()" ).consume(); + } + catch (SessionExpiredException e) + { + failed = true; + assertThat(e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" )); + } + assertTrue( failed ); + assertThat( driver.writeServers(), not( hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) ) ); + assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) ); + assertTrue( driver.connectionPool().hasAddress( new BoltServerAddress( "127.0.0.1", 9007 ) ) ); + + driver.close(); + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + String resource( String fileName ) + { + URL resource = ClusterDriverStubTest.class.getClassLoader().getResource( fileName ); + if ( resource == null ) + { + fail( fileName + " does not exists" ); + } + return resource.getFile(); + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java index b3379d8bd8..26a57f4177 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java @@ -16,98 +16,276 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.neo4j.driver.internal; -import org.junit.Ignore; +import org.hamcrest.Matchers; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.IOException; -import java.net.URI; -import java.util.Set; -import java.util.logging.Level; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; -import org.neo4j.driver.internal.logging.ConsoleLogging; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.Config; -import org.neo4j.driver.v1.GraphDatabase; -import org.neo4j.driver.v1.util.StubServer; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.value.IntegerValue; +import org.neo4j.driver.internal.value.StringValue; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.NoSuchRecordException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.util.BiFunction; +import org.neo4j.driver.v1.util.Function; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertThat; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.core.IsNot.not; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.security.SecurityPlan.insecure; public class ClusterDriverTest { - @Rule public ExpectedException exception = ExpectedException.none(); - private static final Config config = Config.build().withLogging( new ConsoleLogging( Level.INFO ) ).toConfig(); + private static final BoltServerAddress SEED = new BoltServerAddress( "localhost", 7687 ); + private static final String GET_SERVERS = "CALL dbms.cluster.routing.getServers"; + private static final List NO_ADDRESSES = Collections.emptyList(); - @Ignore - public void shouldDiscoverServers() throws IOException, InterruptedException, StubServer.ForceKilled + @Test + public void shouldDoRoutingOnInitialization() { // Given - StubServer server = StubServer.start( "../driver/src/test/resources/discover_servers.script", 9001 ); - URI uri = URI.create( "bolt+discovery://127.0.0.1:9001" ); + final Session session = mock( Session.class ); + when( session.run( GET_SERVERS ) ).thenReturn( + getServers( singletonList( boltAddress( "localhost", 1111 ) ), + singletonList( boltAddress( "localhost", 2222 ) ), + singletonList( boltAddress( "localhost", 3333 ) ) ) ); // When - try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) - { - // Then - Set addresses = driver.servers(); - assertThat( addresses, hasSize( 3 ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); - } - - // Finally - assertThat( server.exitStatus(), equalTo( 0 ) ); + ClusterDriver clusterDriver = forSession( session ); + + // Then + assertThat( clusterDriver.routingServers(), + containsInAnyOrder( boltAddress( "localhost", 1111 ), SEED ) ); + assertThat( clusterDriver.readServers(), + containsInAnyOrder( boltAddress( "localhost", 2222 ) ) ); + assertThat( clusterDriver.writeServers(), + containsInAnyOrder( boltAddress( "localhost", 3333 ) ) ); + } - @Ignore - public void shouldDiscoverNewServers() throws IOException, InterruptedException, StubServer.ForceKilled + @Test + public void shouldDoReRoutingOnSessionAcquisitionIfNecessary() { // Given - StubServer server = StubServer.start( "../driver/src/test/resources/discover_new_servers.script", 9001 ); - URI uri = URI.create( "bolt+discovery://127.0.0.1:9001" ); + final Session session = mock( Session.class ); + when( session.run( GET_SERVERS ) ) + .thenReturn( + getServers( singletonList( boltAddress( "localhost", 1111 ) ), NO_ADDRESSES, NO_ADDRESSES ) ) + .thenReturn( + getServers( singletonList( boltAddress( "localhost", 1112 ) ), + singletonList( boltAddress( "localhost", 2222 ) ), + singletonList( boltAddress( "localhost", 3333 ) ) ) ); + + ClusterDriver clusterDriver = forSession( session ); + + assertThat( clusterDriver.routingServers(), + containsInAnyOrder( boltAddress( "localhost", 1111 ), SEED ) ); + assertThat( clusterDriver.readServers(), Matchers.empty() ); + assertThat( clusterDriver.writeServers(), Matchers.empty() ); + // When - try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) - { - // Then - Set addresses = driver.servers(); - assertThat( addresses, hasSize( 4 ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ) ); - } - - // Finally - assertThat( server.exitStatus(), equalTo( 0 ) ); + clusterDriver.session( AccessMode.READ ); + + // Then + assertThat( clusterDriver.routingServers(), + containsInAnyOrder( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), SEED ) ); + assertThat( clusterDriver.readServers(), + containsInAnyOrder( boltAddress( "localhost", 2222 ) ) ); + assertThat( clusterDriver.writeServers(), + containsInAnyOrder( boltAddress( "localhost", 3333 ) ) ); + } + + @Test + public void shouldNotDoReRoutingOnSessionAcquisitionIfNotNecessary() + { + // Given + final Session session = mock( Session.class ); + when( session.run( GET_SERVERS ) ) + .thenReturn( + getServers( asList( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), + boltAddress( "localhost", 1113 ) ), + singletonList( boltAddress( "localhost", 2222 ) ), + singletonList( boltAddress( "localhost", 3333 ) ) ) ) + .thenReturn( + getServers( singletonList( boltAddress( "localhost", 5555 ) ), NO_ADDRESSES, NO_ADDRESSES ) ); + + ClusterDriver clusterDriver = forSession( session ); + + // When + clusterDriver.session( AccessMode.WRITE ); + + // Then + assertThat( clusterDriver.routingServers(), + not( hasItem( boltAddress( "localhost", 5555 ) ) ) ); } - @Ignore - public void shouldHandleEmptyResponse() throws IOException, InterruptedException, StubServer.ForceKilled + @Test + public void shouldFailIfNoRouting() { // Given - StubServer server = StubServer.start( "../driver/src/test/resources/handle_empty_response.script", 9001 ); - URI uri = URI.create( "bolt+discovery://127.0.0.1:9001" ); - try (ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config )) + final Session session = mock( Session.class ); + when( session.run( GET_SERVERS ) ) + .thenThrow( + new ClientException( "Neo.ClientError.Procedure.ProcedureNotFound", "Procedure not found" ) ); + + // Expect + exception.expect( ServiceUnavailableException.class ); + + // When + forSession( session ); + } + + private ClusterDriver forSession( final Session session ) + { + return new ClusterDriver( SEED, pool(), insecure(), + new BiFunction() + { + @Override + public Session apply( Connection connection, Logger ignore ) + { + return session; + } + }, logging() ); + } + + private BoltServerAddress boltAddress( String host, int port ) + { + return new BoltServerAddress( host, port ); + } + + StatementResult getServers( final List routers, final List readers, + final List writers ) + { + + + return new StatementResult() { - Set servers = driver.servers(); - assertThat(servers, hasSize( 1 )); - assertThat(servers, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); - } + private final int totalSize = routers.size() + readers.size() + writers.size(); + private final Iterator routeIterator = routers.iterator(); + private final Iterator readIterator = readers.iterator(); + private final Iterator writeIterator = writers.iterator(); + private int counter = 0; + + @Override + public List keys() + { + return asList( "address", "mode", "expires" ); + } - // Finally - assertThat( server.exitStatus(), equalTo( 0 ) ); + @Override + public boolean hasNext() + { + return counter++ < totalSize; + } + + @Override + public Record next() + { + if ( routeIterator.hasNext() ) + { + return new InternalRecord( asList( "address", "mode", "expires" ), + new Value[]{new StringValue( routeIterator.next().toString() ), + new StringValue( "ROUTE" ), + new IntegerValue( Long.MAX_VALUE )} ); + } + else if ( readIterator.hasNext() ) + { + return new InternalRecord( asList( "address", "mode", "expires" ), + new Value[]{new StringValue( readIterator.next().toString() ), + new StringValue( "READ" ), + new IntegerValue( Long.MAX_VALUE )} ); + } + else if ( writeIterator.hasNext() ) + { + return new InternalRecord( asList( "address", "mode", "expires" ), + new Value[]{new StringValue( writeIterator.next().toString() ), + new StringValue( "WRITE" ), + new IntegerValue( Long.MAX_VALUE )} ); + } + else + { + return Collections.emptyIterator().next(); + } + } + + @Override + public Record single() throws NoSuchRecordException + { + return null; + } + + @Override + public Record peek() + { + return null; + } + + @Override + public List list() + { + return null; + } + + @Override + public List list( Function mapFunction ) + { + return null; + } + + @Override + public ResultSummary consume() + { + return null; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException( ); + } + }; } + private ConnectionPool pool() + { + ConnectionPool pool = mock( ConnectionPool.class ); + Connection connection = mock( Connection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( pool.acquire( SEED ) ).thenReturn( connection ); + return pool; + } + private Logging logging() + { + Logging mock = mock( Logging.class ); + when( mock.getLog( anyString() ) ).thenReturn( mock( Logger.class ) ); + return mock; + } } \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java new file mode 100644 index 0000000000..563b541548 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java @@ -0,0 +1,162 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class ClusteredNetworkSessionTest +{ + private Connection connection; + private ClusteredErrorHandler onError; + private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); + + @Before + public void setUp() + { + connection = mock( Connection.class ); + when( connection.address() ).thenReturn( LOCALHOST ); + when( connection.isOpen() ).thenReturn( true ); + onError = mock( ClusteredErrorHandler.class ); + } + + @Test + public void shouldHandleConnectionFailures() + { + // Given + doThrow( new ConnectionFailureException( "oh no" ) ). + when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); + + ClusteredNetworkSession result = + new ClusteredNetworkSession( connection, onError, mock( Logger.class ) ); + + // When + try + { + result.run( "CREATE ()" ); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailures() + { + // Given + doThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ). + when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); + ClusteredNetworkSession session = + new ClusteredNetworkSession( connection, onError, mock( Logger.class ) ); + + // When + try + { + session.run( "CREATE ()" ); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldRethrowNonWriteFailures() + { + // Given + ClientException toBeThrown = new ClientException( "code", "oh no!" ); + doThrow( toBeThrown ). + when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); + ClusteredNetworkSession session = + new ClusteredNetworkSession( connection, onError, mock( Logger.class ) ); + + // When + try + { + session.run( "CREATE ()" ); + fail(); + } + catch ( ClientException e ) + { + assertThat( e, is( toBeThrown ) ); + } + + // Then + verifyZeroInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailuresOnClose() + { + // Given + doThrow( new ConnectionFailureException( "oh no" ) ). + when( connection ).sync(); + + ClusteredNetworkSession session = + new ClusteredNetworkSession( connection, onError, mock( Logger.class ) ); + + // When + try + { + session.close(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java new file mode 100644 index 0000000000..228b722f79 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java @@ -0,0 +1,384 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.Test; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class ClusteredStatementResultTest +{ + + private static final BoltServerAddress LOCALHOST = new BoltServerAddress( "localhost", 7687 ); + private StatementResult delegate = mock( StatementResult.class ); + private ClusteredErrorHandler onError = mock( ClusteredErrorHandler.class ); + + @Test + public void shouldHandleConnectionFailureOnConsume() + { + // Given + when( delegate.consume() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.consume(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnConsume() + { + // Given + when( delegate.consume() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.consume(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailureOnHasNext() + { + // Given + when( delegate.hasNext() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.hasNext(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnHasNext() + { + // Given + when( delegate.hasNext() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.hasNext(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailureOnKeys() + { + // Given + when( delegate.keys() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.keys(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnKeys() + { + // Given + when( delegate.keys() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.keys(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailureOnList() + { + // Given + when( delegate.list() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.list(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnList() + { + // Given + when( delegate.list() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.list(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailureOnNext() + { + // Given + when( delegate.next() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.next(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnNext() + { + // Given + when( delegate.next() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.next(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailureOnPeek() + { + // Given + when( delegate.peek() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.peek(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnPeek() + { + // Given + when( delegate.peek() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.peek(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleConnectionFailureOnSingle() + { + // Given + when( delegate.single() ).thenThrow( new ConnectionFailureException( "oh no" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.single(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onConnectionFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } + + @Test + public void shouldHandleWriteFailureOnSingle() + { + // Given + when( delegate.single() ) + .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + ClusteredStatementResult result = + new ClusteredStatementResult( delegate, LOCALHOST, onError ); + + // When + try + { + result.single(); + fail(); + } + catch ( SessionExpiredException e ) + { + //ignore + } + + // Then + verify( onError ).onWriteFailure( LOCALHOST ); + verifyNoMoreInteractions( onError ); + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java index 731bb5c228..ba8f062cda 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.URI; -import java.util.Collection; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.v1.Driver; @@ -49,13 +48,8 @@ public void shouldUseDefaultPortIfMissing() DirectDriver driver = (DirectDriver) GraphDatabase.driver( uri ); // Then - Collection addresses = driver.servers(); - assertThat( addresses.size(), equalTo( 1 ) ); - for ( BoltServerAddress address : addresses ) - { - assertThat( address.port(), equalTo( BoltServerAddress.DEFAULT_PORT ) ); - } - + BoltServerAddress address = driver.server(); + assertThat( address.port(), equalTo( BoltServerAddress.DEFAULT_PORT ) ); } @Test @@ -69,9 +63,8 @@ public void shouldRegisterSingleServer() DirectDriver driver = (DirectDriver) GraphDatabase.driver( uri ); // Then - Collection addresses = driver.servers(); - assertThat( addresses.size(), equalTo( 1 ) ); - assertThat( addresses.contains( address ), equalTo( true ) ); + BoltServerAddress driverAddress = driver.server(); + assertThat( driverAddress, equalTo( address )); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSetTest.java new file mode 100644 index 0000000000..a2c1e32f0b --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSetTest.java @@ -0,0 +1,170 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + + +import org.junit.Test; + +import java.util.Comparator; +import java.util.HashSet; + +import static java.util.Arrays.asList; +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; + +public class ConcurrentRoundRobinSetTest +{ + + @Test + public void shouldBeAbleToIterateIndefinitely() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + + // Then + for ( int i = 0; i < 100; i++ ) + { + assertThat( integers.hop(), equalTo( i % 5 ) ); + } + } + + @Test + public void shouldBeAbleToUseCustomComparator() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>( new Comparator() + { + @Override + public int compare( Integer o1, Integer o2 ) + { + return Integer.compare( o2, o1 ); + } + } ); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + + // Then + assertThat( integers.hop(), equalTo( 4 ) ); + assertThat( integers.hop(), equalTo( 3 ) ); + assertThat( integers.hop(), equalTo( 2 ) ); + assertThat( integers.hop(), equalTo( 1 ) ); + assertThat( integers.hop(), equalTo( 0 ) ); + assertThat( integers.hop(), equalTo( 4 ) ); + assertThat( integers.hop(), equalTo( 3 ) ); + //.... + } + + @Test + public void shouldBeAbleToClearSet() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + integers.clear(); + + // Then + assertThat( integers, empty() ); + } + + @Test + public void shouldBeAbleToCheckIfContainsElement() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + + + // Then + assertTrue( integers.contains( 3 ) ); + assertFalse( integers.contains( 7 ) ); + } + + @Test + public void shouldBeAbleToCheckIfContainsMultipleElements() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + + + // Then + assertTrue( integers.containsAll( asList( 3, 1 ) ) ); + assertFalse( integers.containsAll( asList( 2, 3, 4, 7 ) ) ); + } + + @Test + public void shouldBeAbleToCheckIfEmptyAndSize() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + + + // Then + assertFalse( integers.isEmpty() ); + assertThat( integers.size(), equalTo( 5 ) ); + integers.clear(); + assertTrue( integers.isEmpty() ); + assertThat( integers.size(), equalTo( 0 ) ); + } + + + @Test + public void shouldBeAbleToCreateArray() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + Object[] objects = integers.toArray(); + + // Then + assertThat( objects, equalTo( new Object[]{0, 1, 2, 3, 4} ) ); + } + + @Test + public void shouldBeAbleToCreateTypedArray() + { + // Given + ConcurrentRoundRobinSet integers = new ConcurrentRoundRobinSet<>(); + + // When + integers.addAll( asList( 0, 1, 2, 3, 4 ) ); + Integer[] array = integers.toArray( new Integer[5] ); + + // Then + assertThat( array, equalTo( new Integer[]{0, 1, 2, 3, 4} ) ); + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index 047f2c29d9..75dc59f68e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -54,7 +54,7 @@ public void boltPlusDiscoverySchemeShouldInstantiateClusterDriver() throws IOExc { // Given StubServer server = StubServer.start( "../driver/src/test/resources/discover_servers.script" ); - URI uri = URI.create( "bolt+discovery://localhost:7687" ); + URI uri = URI.create( "bolt+routing://localhost:7687" ); // When Driver driver = GraphDatabase.driver( uri ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index ecba230bcf..83543355e0 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -21,7 +21,13 @@ import org.junit.Rule; import org.junit.Test; -import org.neo4j.driver.v1.*; +import org.neo4j.driver.v1.AuthToken; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.util.TestNeo4j; @@ -101,8 +107,7 @@ public void shouldKillLongRunningStatement() throws Throwable final int killTimeout = 1; // 1s long startTime = -1, endTime; - final Session session = driver.session(); - try + try ( Session session = driver.session() ) { StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", @@ -114,9 +119,9 @@ public void shouldKillLongRunningStatement() throws Throwable startTime = System.currentTimeMillis(); result.consume();// blocking to run the statement - fail("Should have got an exception about statement get killed."); + fail( "Should have got an exception about statement get killed." ); } - catch( Neo4jException e ) + catch ( Neo4jException e ) { endTime = System.currentTimeMillis(); assertTrue( startTime > 0 ); @@ -127,10 +132,6 @@ public void shouldKillLongRunningStatement() throws Throwable { fail( "Should be a Neo4jException" ); } - finally - { - session.close(); - } } @Test diff --git a/driver/src/test/resources/acquire_endpoints.script b/driver/src/test/resources/acquire_endpoints.script new file mode 100644 index 0000000000..c34f2f9b33 --- /dev/null +++ b/driver/src/test/resources/acquire_endpoints.script @@ -0,0 +1,16 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["address", "mode", "expires"]} + RECORD ["127.0.0.1:9007", "WRITE",9223372036854775807] + RECORD ["127.0.0.1:9008", "WRITE",9223372036854775807] + RECORD ["127.0.0.1:9005", "READ",9223372036854775807] + RECORD ["127.0.0.1:9006", "READ",9223372036854775807] + RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] + SUCCESS {} \ No newline at end of file diff --git a/driver/src/test/resources/acquire_read_endpoint.script b/driver/src/test/resources/acquire_read_endpoint.script deleted file mode 100644 index 656e8e355f..0000000000 --- a/driver/src/test/resources/acquire_read_endpoint.script +++ /dev/null @@ -1,12 +0,0 @@ -!: AUTO INIT -!: AUTO RESET -!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} -!: AUTO PULL_ALL - -C: RUN "CALL dbms.cluster.discoverMembers" {} - PULL_ALL -S: SUCCESS {"fields": ["address"]} - RECORD ["127.0.0.1:9001"] - RECORD ["127.0.0.1:9002"] - RECORD ["127.0.0.1:9003"] - SUCCESS {} diff --git a/driver/src/test/resources/dead_server.script b/driver/src/test/resources/dead_server.script new file mode 100644 index 0000000000..950c4dc88b --- /dev/null +++ b/driver/src/test/resources/dead_server.script @@ -0,0 +1,8 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "MATCH (n) RETURN n.name" {} +C: PULL_ALL +S: \ No newline at end of file diff --git a/driver/src/test/resources/discover_invalid_server.script b/driver/src/test/resources/discover_invalid_server.script index c3ecbeb0b5..ad6080e9ba 100644 --- a/driver/src/test/resources/discover_invalid_server.script +++ b/driver/src/test/resources/discover_invalid_server.script @@ -3,10 +3,10 @@ !: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} !: AUTO PULL_ALL -C: RUN "CALL dbms.cluster.discoverMembers" {} +C: RUN "CALL dbms.cluster.discoverEndpointAcquisitionServers" {} PULL_ALL S: SUCCESS {"fields": ["address"]} RECORD ["127.0.0.1:9001"] SUCCESS {} -C: RUN "CALL dbms.cluster.discoverMembers" {} +C: RUN "CALL dbms.cluster.discoverEndpointAcquisitionServers" {} PULL_ALL diff --git a/driver/src/test/resources/discover_new_servers.script b/driver/src/test/resources/discover_new_servers.script index 034ff3b962..14cf676417 100644 --- a/driver/src/test/resources/discover_new_servers.script +++ b/driver/src/test/resources/discover_new_servers.script @@ -3,10 +3,13 @@ !: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} !: AUTO PULL_ALL -C: RUN "CALL dbms.cluster.discoverMembers" {} +C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address"]} - RECORD ["127.0.0.1:9004"] - RECORD ["127.0.0.1:9002"] - RECORD ["127.0.0.1:9003"] +S: SUCCESS {"fields": ["address", "mode", "expires"]} + RECORD ["127.0.0.1:9002", "WRITE",9223372036854775807] + RECORD ["127.0.0.1:9005", "READ",9223372036854775807] + RECORD ["127.0.0.1:9003", "READ",9223372036854775807] + RECORD ["127.0.0.1:9004", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] SUCCESS {} diff --git a/driver/src/test/resources/discover_servers.script b/driver/src/test/resources/discover_servers.script index 656e8e355f..8f98264424 100644 --- a/driver/src/test/resources/discover_servers.script +++ b/driver/src/test/resources/discover_servers.script @@ -3,10 +3,13 @@ !: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} !: AUTO PULL_ALL -C: RUN "CALL dbms.cluster.discoverMembers" {} +C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address"]} - RECORD ["127.0.0.1:9001"] - RECORD ["127.0.0.1:9002"] - RECORD ["127.0.0.1:9003"] +S: SUCCESS {"fields": ["address", "mode", "expires"]} + RECORD ["127.0.0.1:9001", "WRITE",9223372036854775807] + RECORD ["127.0.0.1:9002", "READ",9223372036854775807] + RECORD ["127.0.0.1:9003", "READ",9223372036854775807] + RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] SUCCESS {} diff --git a/driver/src/test/resources/handle_empty_response.script b/driver/src/test/resources/handle_empty_response.script index 4fc62f9943..8f64f8853e 100644 --- a/driver/src/test/resources/handle_empty_response.script +++ b/driver/src/test/resources/handle_empty_response.script @@ -3,7 +3,7 @@ !: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} !: AUTO PULL_ALL -C: RUN "CALL dbms.cluster.discoverMembers" {} +C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address"]} +S: SUCCESS {"fields": ["address", "mode", "expires"]} SUCCESS {} diff --git a/driver/src/test/resources/non_discovery_server.script b/driver/src/test/resources/non_discovery_server.script new file mode 100644 index 0000000000..79b9c74fdb --- /dev/null +++ b/driver/src/test/resources/non_discovery_server.script @@ -0,0 +1,11 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} +C: PULL_ALL +S: FAILURE {"code": "Neo.ClientError.Procedure.ProcedureNotFound", "message": "blabla"} +S: IGNORED +C: ACK_FAILURE +S: SUCCESS {} \ No newline at end of file diff --git a/driver/src/test/resources/not_able_to_write_server.script b/driver/src/test/resources/not_able_to_write_server.script new file mode 100644 index 0000000000..a3ea552e1b --- /dev/null +++ b/driver/src/test/resources/not_able_to_write_server.script @@ -0,0 +1,11 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CREATE ()" {} +C: PULL_ALL +S: FAILURE {"code": "Neo.ClientError.General.ForbiddenOnFollower", "message": "blabla"} +S: IGNORED +C: ACK_FAILURE +S: SUCCESS {} diff --git a/driver/src/test/resources/read_server.script b/driver/src/test/resources/read_server.script new file mode 100644 index 0000000000..17e2d3a22c --- /dev/null +++ b/driver/src/test/resources/read_server.script @@ -0,0 +1,12 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "MATCH (n) RETURN n.name" {} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + RECORD ["Tina"] + SUCCESS {} \ No newline at end of file diff --git a/driver/src/test/resources/rediscover.script b/driver/src/test/resources/rediscover.script new file mode 100644 index 0000000000..8ef0f3cabe --- /dev/null +++ b/driver/src/test/resources/rediscover.script @@ -0,0 +1,20 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["address", "mode", "expires"]} + RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["address", "mode", "expires"]} + RECORD ["127.0.0.1:9004", "WRITE",9223372036854775807] + RECORD ["127.0.0.1:9005", "READ",9223372036854775807] + RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] + RECORD ["127.0.0.1:9004", "ROUTE",9223372036854775807] + SUCCESS {} diff --git a/driver/src/test/resources/write_server.script b/driver/src/test/resources/write_server.script new file mode 100644 index 0000000000..27edbefa0c --- /dev/null +++ b/driver/src/test/resources/write_server.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CREATE (n {name:'Bob'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} \ No newline at end of file