diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java index 3d3eeafe8f..12ce4d1a9d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java @@ -22,15 +22,18 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -49,6 +52,8 @@ import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener; import static org.neo4j.driver.internal.util.Futures.combineErrors; import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLockAsync; public class ConnectionPoolImpl implements ConnectionPool { @@ -63,7 +68,8 @@ public class ConnectionPoolImpl implements ConnectionPool private final MetricsListener metricsListener; private final boolean ownsEventLoopGroup; - private final ConcurrentMap pools = new ConcurrentHashMap<>(); + private final ReadWriteLock addressToPoolLock = new ReentrantReadWriteLock(); + private final Map addressToPool = new HashMap<>(); private final AtomicBoolean closed = new AtomicBoolean(); private final CompletableFuture closeFuture = new CompletableFuture<>(); private final ConnectionFactory connectionFactory; @@ -126,25 +132,32 @@ public CompletionStage acquire( BoltServerAddress address ) @Override public void retainAll( Set addressesToRetain ) { - for ( BoltServerAddress address : pools.keySet() ) + executeWithLock( addressToPoolLock.writeLock(), () -> { - if ( !addressesToRetain.contains( address ) ) + Iterator> entryIterator = addressToPool.entrySet().iterator(); + while ( entryIterator.hasNext() ) { - int activeChannels = nettyChannelTracker.inUseChannelCount( address ); - if ( activeChannels == 0 ) + Map.Entry entry = entryIterator.next(); + BoltServerAddress address = entry.getKey(); + if ( !addressesToRetain.contains( address ) ) { - // address is not present in updated routing table and has no active connections - // it's now safe to terminate corresponding connection pool and forget about it - ExtendedChannelPool pool = pools.remove( address ); - if ( pool != null ) + int activeChannels = nettyChannelTracker.inUseChannelCount( address ); + if ( activeChannels == 0 ) { - log.info( "Closing connection pool towards %s, it has no active connections " + - "and is not in the routing table registry.", address ); - closePoolInBackground( address, pool ); + // address is not present in updated routing table and has no active connections + // it's now safe to terminate corresponding connection pool and forget about it + ExtendedChannelPool pool = entry.getValue(); + entryIterator.remove(); + if ( pool != null ) + { + log.info( "Closing connection pool towards %s, it has no active connections " + + "and is not in the routing table registry.", address ); + closePoolInBackground( address, pool ); + } } } } - } + } ); } @Override @@ -165,21 +178,26 @@ public CompletionStage close() if ( closed.compareAndSet( false, true ) ) { nettyChannelTracker.prepareToCloseChannels(); - CompletableFuture allPoolClosedFuture = closeAllPools(); - // We can only shutdown event loop group when all netty pools are fully closed, - // otherwise the netty pools might missing threads (from event loop group) to execute clean ups. - allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> { - pools.clear(); - if ( !ownsEventLoopGroup ) - { - completeWithNullIfNoError( closeFuture, pollCloseError ); - } - else - { - shutdownEventLoopGroup( pollCloseError ); - } - } ); + executeWithLockAsync( addressToPoolLock.writeLock(), + () -> + { + // We can only shutdown event loop group when all netty pools are fully closed, + // otherwise the netty pools might missing threads (from event loop group) to execute clean ups. + return closeAllPools().whenComplete( + ( ignored, pollCloseError ) -> + { + addressToPool.clear(); + if ( !ownsEventLoopGroup ) + { + completeWithNullIfNoError( closeFuture, pollCloseError ); + } + else + { + shutdownEventLoopGroup( pollCloseError ); + } + } ); + } ); } return closeFuture; } @@ -187,13 +205,13 @@ public CompletionStage close() @Override public boolean isOpen( BoltServerAddress address ) { - return pools.containsKey( address ); + return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.containsKey( address ) ); } @Override public String toString() { - return "ConnectionPoolImpl{" + "pools=" + pools + '}'; + return executeWithLock( addressToPoolLock.readLock(), () -> "ConnectionPoolImpl{" + "pools=" + addressToPool + '}' ); } private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error ) @@ -239,7 +257,7 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend { pool.release( channel ); closePoolInBackground( address, pool ); - pools.remove( address ); + executeWithLock( addressToPoolLock.writeLock(), () -> addressToPool.remove( address ) ); assertNotClosed(); } } @@ -247,7 +265,7 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend // for testing only ExtendedChannelPool getPool( BoltServerAddress address ) { - return pools.get( address ); + return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) ); } ExtendedChannelPool newPool( BoltServerAddress address ) @@ -258,12 +276,22 @@ ExtendedChannelPool newPool( BoltServerAddress address ) private ExtendedChannelPool getOrCreatePool( BoltServerAddress address ) { - return pools.computeIfAbsent( address, ignored -> { - ExtendedChannelPool pool = newPool( address ); - // before the connection pool is added I can add the metrics for the pool. - metricsListener.putPoolMetrics( pool.id(), address, this ); - return pool; - } ); + ExtendedChannelPool existingPool = executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) ); + return existingPool != null + ? existingPool + : executeWithLock( addressToPoolLock.writeLock(), + () -> + { + ExtendedChannelPool pool = addressToPool.get( address ); + if ( pool == null ) + { + pool = newPool( address ); + // before the connection pool is added I can add the metrics for the pool. + metricsListener.putPoolMetrics( pool.id(), address, this ); + addressToPool.put( address, pool ); + } + return pool; + } ); } private CompletionStage closePool( ExtendedChannelPool pool ) @@ -305,12 +333,15 @@ private void shutdownEventLoopGroup( Throwable pollCloseError ) private CompletableFuture closeAllPools() { return CompletableFuture.allOf( - pools.entrySet().stream().map( entry -> { - BoltServerAddress address = entry.getKey(); - ExtendedChannelPool pool = entry.getValue(); - log.info( "Closing connection pool towards %s", address ); - // Wait for all pools to be closed. - return closePool( pool ).toCompletableFuture(); - } ).toArray( CompletableFuture[]::new ) ); + addressToPool.entrySet().stream() + .map( entry -> + { + BoltServerAddress address = entry.getKey(); + ExtendedChannelPool pool = entry.getValue(); + log.info( "Closing connection pool towards %s", address ); + // Wait for all pools to be closed. + return closePool( pool ).toCompletableFuture(); + } ) + .toArray( CompletableFuture[]::new ) ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java deleted file mode 100644 index c4cc3f2b20..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.Set; - -import org.neo4j.driver.internal.BoltServerAddress; - -public class AddressSet -{ - private static final BoltServerAddress[] NONE = {}; - - private volatile BoltServerAddress[] addresses = NONE; - - public BoltServerAddress[] toArray() - { - return addresses; - } - - public int size() - { - return addresses.length; - } - - public synchronized void retainAllAndAdd( Set newAddresses ) - { - BoltServerAddress[] addressesArr = new BoltServerAddress[newAddresses.size()]; - int insertionIdx = 0; - for ( BoltServerAddress address : addresses ) - { - if ( newAddresses.remove( address ) ) - { - addressesArr[insertionIdx] = address; - insertionIdx++; - } - } - Iterator addressIterator = newAddresses.iterator(); - for ( ; insertionIdx < addressesArr.length && addressIterator.hasNext(); insertionIdx++ ) - { - addressesArr[insertionIdx] = addressIterator.next(); - } - addresses = addressesArr; - } - - public synchronized void replaceIfPresent( BoltServerAddress oldAddress, BoltServerAddress newAddress ) - { - for ( int i = 0; i < addresses.length; i++ ) - { - if ( addresses[i].equals( oldAddress ) ) - { - addresses[i] = newAddress; - } - } - } - - public synchronized void remove( BoltServerAddress address ) - { - BoltServerAddress[] addresses = this.addresses; - if ( addresses != null ) - { - for ( int i = 0; i < addresses.length; i++ ) - { - if ( addresses[i].equals( address ) ) - { - if ( addresses.length == 1 ) - { - this.addresses = NONE; - return; - } - BoltServerAddress[] copy = new BoltServerAddress[addresses.length - 1]; - System.arraycopy( addresses, 0, copy, 0, i ); - System.arraycopy( addresses, i + 1, copy, i, addresses.length - i - 1 ); - this.addresses = copy; - return; - } - } - } - } - - @Override - public String toString() - { - return "AddressSet=" + Arrays.toString( addresses ); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 3604a5ffc9..a1caf44aa0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -18,10 +18,15 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.neo4j.driver.AccessMode; import org.neo4j.driver.internal.BoltServerAddress; @@ -30,24 +35,27 @@ import static java.lang.String.format; import static java.util.Arrays.asList; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; public class ClusterRoutingTable implements RoutingTable { private static final int MIN_ROUTERS = 1; + private final ReadWriteLock tableLock = new ReentrantReadWriteLock(); + private final DatabaseName databaseName; private final Clock clock; - private volatile long expirationTimestamp; - private final AddressSet readers; - private final AddressSet writers; - private final AddressSet routers; + private final Set disused = new HashSet<>(); - private final DatabaseName databaseName; // specifies the database this routing table is acquired for - private boolean preferInitialRouter; + private long expirationTimestamp; + private boolean preferInitialRouter = true; + private List readers = Collections.emptyList(); + private List writers = Collections.emptyList(); + private List routers = Collections.emptyList(); public ClusterRoutingTable( DatabaseName ofDatabase, Clock clock, BoltServerAddress... routingAddresses ) { this( ofDatabase, clock ); - routers.retainAllAndAdd( new LinkedHashSet<>( asList( routingAddresses ) ) ); + routers = Collections.unmodifiableList( asList( routingAddresses ) ); } private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock ) @@ -55,77 +63,85 @@ private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock ) this.databaseName = ofDatabase; this.clock = clock; this.expirationTimestamp = clock.millis() - 1; - this.preferInitialRouter = true; - - this.readers = new AddressSet(); - this.writers = new AddressSet(); - this.routers = new AddressSet(); } @Override public boolean isStaleFor( AccessMode mode ) { - return expirationTimestamp < clock.millis() || - routers.size() < MIN_ROUTERS || - mode == AccessMode.READ && readers.size() == 0 || - mode == AccessMode.WRITE && writers.size() == 0; + return executeWithLock( tableLock.readLock(), () -> + expirationTimestamp < clock.millis() || + routers.size() < MIN_ROUTERS || + mode == AccessMode.READ && readers.size() == 0 || + mode == AccessMode.WRITE && writers.size() == 0 ); } @Override public boolean hasBeenStaleFor( long extraTime ) { - long totalTime = expirationTimestamp + extraTime; + long totalTime = executeWithLock( tableLock.readLock(), () -> expirationTimestamp ) + extraTime; if ( totalTime < 0 ) { totalTime = Long.MAX_VALUE; } - return totalTime < clock.millis(); + return totalTime < clock.millis(); } @Override - public synchronized void update( ClusterComposition cluster ) + public void update( ClusterComposition cluster ) { - expirationTimestamp = cluster.expirationTimestamp(); - readers.retainAllAndAdd( cluster.readers() ); - writers.retainAllAndAdd( cluster.writers() ); - routers.retainAllAndAdd( cluster.routers() ); - preferInitialRouter = !cluster.hasWriters(); + executeWithLock( tableLock.writeLock(), () -> + { + expirationTimestamp = cluster.expirationTimestamp(); + readers = newWithReusedAddresses( readers, disused, cluster.readers() ); + writers = newWithReusedAddresses( writers, disused, cluster.writers() ); + routers = newWithReusedAddresses( routers, disused, cluster.routers() ); + disused.clear(); + preferInitialRouter = !cluster.hasWriters(); + } ); } @Override - public synchronized void forget( BoltServerAddress address ) + public void forget( BoltServerAddress address ) { - routers.remove( address ); - readers.remove( address ); - writers.remove( address ); + executeWithLock( tableLock.writeLock(), () -> + { + routers = newWithoutAddressIfPresent( routers, address ); + readers = newWithoutAddressIfPresent( readers, address ); + writers = newWithoutAddressIfPresent( writers, address ); + disused.add( address ); + } ); } @Override - public AddressSet readers() + public List readers() { - return readers; + return executeWithLock( tableLock.readLock(), () -> readers ); } @Override - public AddressSet writers() + public List writers() { - return writers; + return executeWithLock( tableLock.readLock(), () -> writers ); } @Override - public AddressSet routers() + public List routers() { - return routers; + return executeWithLock( tableLock.readLock(), () -> routers ); } @Override public Set servers() { - Set servers = new HashSet<>(); - Collections.addAll( servers, readers.toArray() ); - Collections.addAll( servers, writers.toArray() ); - Collections.addAll( servers, routers.toArray() ); - return servers; + return executeWithLock( tableLock.readLock(), () -> + { + Set servers = new HashSet<>(); + servers.addAll( readers ); + servers.addAll( writers ); + servers.addAll( routers ); + servers.addAll( disused ); + return servers; + } ); } @Override @@ -137,25 +153,69 @@ public DatabaseName database() @Override public void forgetWriter( BoltServerAddress toRemove ) { - writers.remove( toRemove ); + executeWithLock( tableLock.writeLock(), () -> + { + writers = newWithoutAddressIfPresent( writers, toRemove ); + disused.add( toRemove ); + } ); } @Override public void replaceRouterIfPresent( BoltServerAddress oldRouter, BoltServerAddress newRouter ) { - routers.replaceIfPresent( oldRouter, newRouter ); + executeWithLock( tableLock.writeLock(), () -> routers = newWithAddressReplacedIfPresent( routers, oldRouter, newRouter ) ); } @Override public boolean preferInitialRouter() { - return preferInitialRouter; + return executeWithLock( tableLock.readLock(), () -> preferInitialRouter ); } @Override - public synchronized String toString() + public String toString() + { + return executeWithLock( tableLock.readLock(), () -> + format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'", + expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ) ); + } + + private List newWithoutAddressIfPresent( List addresses, BoltServerAddress addressToSkip ) + { + List newList = new ArrayList<>( addresses.size() ); + for ( BoltServerAddress address : addresses ) + { + if ( !address.equals( addressToSkip ) ) + { + newList.add( address ); + } + } + return Collections.unmodifiableList( newList ); + } + + private List newWithAddressReplacedIfPresent( List addresses, BoltServerAddress oldAddress, + BoltServerAddress newAddress ) + { + List newList = new ArrayList<>( addresses.size() ); + for ( BoltServerAddress address : addresses ) + { + newList.add( address.equals( oldAddress ) ? newAddress : address ); + } + return Collections.unmodifiableList( newList ); + } + + private List newWithReusedAddresses( List currentAddresses, Set disusedAddresses, + Set newAddresses ) + { + List newList = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() ) + .filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) ) + .collect( Collectors.toCollection( () -> new ArrayList<>( newAddresses.size() ) ) ); + newList.addAll( newAddresses ); + return Collections.unmodifiableList( newList ); + } + + private BoltServerAddress toBoltServerAddress( BoltServerAddress address ) { - return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'", - expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ); + return BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java index adb06f7372..8e7d37952c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java @@ -199,10 +199,8 @@ private CompletionStage lookupOnKnownRouters( Ro Set seenServers, Bookmark bookmark, Throwable baseError ) { - BoltServerAddress[] addresses = routingTable.routers().toArray(); - CompletableFuture result = completedWithNull(); - for ( BoltServerAddress address : addresses ) + for ( BoltServerAddress address : routingTable.routers() ) { result = result .thenCompose( diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 7fa7000bda..04c48d228c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.List; import java.util.Set; import org.neo4j.driver.AccessMode; @@ -34,12 +35,34 @@ public interface RoutingTable void forget( BoltServerAddress address ); - AddressSet readers(); + /** + * Returns an immutable list of reader addresses. + * + * @return the immutable list of reader addresses. + */ + List readers(); - AddressSet writers(); + /** + * Returns an immutable list of writer addresses. + * + * @return the immutable list of write addresses. + */ - AddressSet routers(); + List writers(); + /** + * Returns an immutable list of router addresses. + * + * @return the immutable list of router addresses. + */ + + List routers(); + + /** + * Returns an immutable unordered set of all addresses known by this routing table. This includes all router, reader, writer and disused addresses. + * + * @return the immutable set of all addresses. + */ Set servers(); DatabaseName database(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java index 7605b48ee8..0e8b1f0bf2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java @@ -112,6 +112,7 @@ private synchronized void freshClusterCompositionFetched( ClusterCompositionLook { try { + log.debug( "Fetched cluster composition for database '%s'. %s", databaseName.description(), compositionLookupResult.getClusterComposition() ); routingTable.update( compositionLookupResult.getClusterComposition() ); routingTableRegistry.removeAged(); @@ -142,7 +143,8 @@ private synchronized void freshClusterCompositionFetched( ClusterCompositionLook private synchronized void clusterCompositionLookupFailed( Throwable error ) { - log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ), error ); + log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ), + error ); routingTableRegistry.remove( databaseName ); CompletableFuture routingTableFuture = refreshRoutingTableFuture; refreshRoutingTableFuture = null; diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java index fe30c1ae59..3788771d74 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java @@ -18,15 +18,17 @@ */ package org.neo4j.driver.internal.cluster.loadbalancing; -import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.spi.ConnectionPool; +import java.util.List; + import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.spi.ConnectionPool; /** - * Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from - * given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent - * choosing same first address over and over when all addresses have same amount of active connections. + * Load balancing strategy that finds server with the least amount of active (checked out of the pool) connections from given readers or writers. It finds a + * start index for iteration in a round-robin fashion. This is done to prevent choosing same first address over and over when all addresses have the same amount + * of active connections. */ public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy { @@ -45,21 +47,21 @@ public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool, Loggi } @Override - public BoltServerAddress selectReader( BoltServerAddress[] knownReaders ) + public BoltServerAddress selectReader( List knownReaders ) { return select( knownReaders, readersIndex, "reader" ); } @Override - public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ) + public BoltServerAddress selectWriter( List knownWriters ) { return select( knownWriters, writersIndex, "writer" ); } - private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex, - String addressType ) + private BoltServerAddress select( List addresses, RoundRobinArrayIndex addressesIndex, + String addressType ) { - int size = addresses.length; + int size = addresses.size(); if ( size == 0 ) { log.trace( "Unable to select %s, no known addresses given", addressType ); @@ -73,10 +75,10 @@ private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArray BoltServerAddress leastConnectedAddress = null; int leastActiveConnections = Integer.MAX_VALUE; - // iterate over the array to find least connected address + // iterate over the array to find the least connected address do { - BoltServerAddress address = addresses[index]; + BoltServerAddress address = addresses.get( index ); int activeConnections = connectionPool.inUseConnections( address ); if ( activeConnections < leastActiveConnections ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 16a741efb8..004fe57dcf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -35,7 +35,6 @@ import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.async.ConnectionContext; import org.neo4j.driver.internal.async.connection.RoutingConnection; -import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.ClusterCompositionProvider; import org.neo4j.driver.internal.cluster.Rediscovery; import org.neo4j.driver.internal.cluster.RediscoveryImpl; @@ -68,6 +67,7 @@ public class LoadBalancer implements ConnectionProvider "Failed to obtain connection towards %s server. Known routing table is: %s"; private static final String CONNECTION_ACQUISITION_ATTEMPT_FAILURE_MESSAGE = "Failed to obtain a connection towards address %s, will try other addresses if available. Complete failure is reported separately from this entry."; + private static final BoltServerAddress[] BOLT_SERVER_ADDRESSES_EMPTY_ARRAY = new BoltServerAddress[0]; private final ConnectionPool connectionPool; private final RoutingTableRegistry routingTables; private final LoadBalancingStrategy loadBalancingStrategy; @@ -192,16 +192,15 @@ private CompletionStage supportsMultiDb( BoltServerAddress address ) private CompletionStage acquire( AccessMode mode, RoutingTable routingTable ) { - AddressSet addresses = addressSet( mode, routingTable ); CompletableFuture result = new CompletableFuture<>(); List attemptExceptions = new ArrayList<>(); - acquire( mode, routingTable, addresses, result, attemptExceptions ); + acquire( mode, routingTable, result, attemptExceptions ); return result; } - private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet addresses, CompletableFuture result, - List attemptErrors ) + private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFuture result, List attemptErrors ) { + List addresses = getAddressesByMode( mode, routingTable ); BoltServerAddress address = selectAddress( mode, addresses ); if ( address == null ) @@ -226,7 +225,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add log.debug( attemptMessage, error ); attemptErrors.add( error ); routingTable.forget( address ); - eventExecutorGroup.next().execute( () -> acquire( mode, routingTable, addresses, result, attemptErrors ) ); + eventExecutorGroup.next().execute( () -> acquire( mode, routingTable, result, attemptErrors ) ); } else { @@ -240,7 +239,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add } ); } - private static AddressSet addressSet( AccessMode mode, RoutingTable routingTable ) + private static List getAddressesByMode( AccessMode mode, RoutingTable routingTable ) { switch ( mode ) { @@ -253,10 +252,8 @@ private static AddressSet addressSet( AccessMode mode, RoutingTable routingTable } } - private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers ) + private BoltServerAddress selectAddress( AccessMode mode, List addresses ) { - BoltServerAddress[] addresses = servers.toArray(); - switch ( mode ) { case READ: diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java index dbaecdb08f..d05189e79c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.cluster.loadbalancing; +import java.util.List; + import org.neo4j.driver.internal.BoltServerAddress; /** @@ -31,7 +33,7 @@ public interface LoadBalancingStrategy * @param knownReaders array of all known readers. * @return most appropriate reader or {@code null} if it can't be selected. */ - BoltServerAddress selectReader( BoltServerAddress[] knownReaders ); + BoltServerAddress selectReader( List knownReaders ); /** * Select most appropriate write address from the given array of addresses. @@ -39,5 +41,5 @@ public interface LoadBalancingStrategy * @param knownWriters array of all known writers. * @return most appropriate writer or {@code null} if it can't be selected. */ - BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ); + BoltServerAddress selectWriter( List knownWriters ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java index 63bb0c76c3..fb90ea1cb1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java +++ b/driver/src/main/java/org/neo4j/driver/internal/summary/InternalSummaryCounters.java @@ -204,4 +204,23 @@ private boolean isPositive( int value ) { return value > 0; } + + @Override + public String toString() + { + return "InternalSummaryCounters{" + + "nodesCreated=" + nodesCreated + + ", nodesDeleted=" + nodesDeleted + + ", relationshipsCreated=" + relationshipsCreated + + ", relationshipsDeleted=" + relationshipsDeleted + + ", propertiesSet=" + propertiesSet + + ", labelsAdded=" + labelsAdded + + ", labelsRemoved=" + labelsRemoved + + ", indexesAdded=" + indexesAdded + + ", indexesRemoved=" + indexesRemoved + + ", constraintsAdded=" + constraintsAdded + + ", constraintsRemoved=" + constraintsRemoved + + ", systemUpdates=" + systemUpdates + + '}'; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java new file mode 100644 index 0000000000..f308921ef3 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +public class LockUtil +{ + public static void executeWithLock( Lock lock, Runnable runnable ) + { + lock.lock(); + try + { + runnable.run(); + } + finally + { + lock.unlock(); + } + } + + public static T executeWithLock( Lock lock, Supplier supplier ) + { + lock.lock(); + try + { + return supplier.get(); + } + finally + { + lock.unlock(); + } + } + + public static void executeWithLockAsync( Lock lock, Supplier> stageSupplier ) + { + lock.lock(); + CompletableFuture.completedFuture( lock ) + .thenCompose( ignored -> stageSupplier.get() ) + .whenComplete( ( ignored, throwable ) -> lock.unlock() ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java deleted file mode 100644 index 57b80fac8a..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import org.junit.jupiter.api.Test; - -import java.util.LinkedHashSet; -import java.util.Set; - -import org.neo4j.driver.internal.BoltServerAddress; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; - -class AddressSetTest -{ - @Test - void shouldPreserveOrderWhenAdding() throws Exception - { - // given - Set servers = addresses( "one", "two", "tre" ); - - AddressSet set = new AddressSet(); - set.retainAllAndAdd( servers ); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - - // when - servers.add( new BoltServerAddress( "fyr" ) ); - set.retainAllAndAdd( servers ); - - // then - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ), - new BoltServerAddress( "fyr" )}, set.toArray() ); - } - - @Test - void shouldPreserveOrderWhenRemoving() throws Exception - { - // given - Set servers = addresses( "one", "two", "tre" ); - AddressSet set = new AddressSet(); - set.retainAllAndAdd( servers ); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - - // when - set.remove( new BoltServerAddress( "one" ) ); - - // then - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - } - - @Test - void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception - { - // given - Set servers = addresses( "one", "two", "tre" ); - AddressSet set = new AddressSet(); - set.retainAllAndAdd( servers ); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - - // when - servers.remove( new BoltServerAddress( "one" ) ); - set.retainAllAndAdd( servers ); - - // then - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - } - - @Test - void shouldExposeEmptyArrayWhenEmpty() - { - AddressSet addressSet = new AddressSet(); - - BoltServerAddress[] addresses = addressSet.toArray(); - - assertEquals( 0, addresses.length ); - } - - @Test - void shouldExposeCorrectArray() - { - AddressSet addressSet = new AddressSet(); - addressSet.retainAllAndAdd( addresses( "one", "two", "tre" ) ); - - BoltServerAddress[] addresses = addressSet.toArray(); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, addresses ); - } - - @Test - void shouldHaveSizeZeroWhenEmpty() - { - AddressSet addressSet = new AddressSet(); - - assertEquals( 0, addressSet.size() ); - } - - @Test - void shouldHaveCorrectSize() - { - AddressSet addressSet = new AddressSet(); - addressSet.retainAllAndAdd( addresses( "one", "two" ) ); - - assertEquals( 2, addressSet.size() ); - } - - private static Set addresses( String... strings ) - { - Set set = new LinkedHashSet<>(); - for ( String string : strings ) - { - set.add( new BoltServerAddress( string ) ); - } - return set; - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index 0987774f0b..1aba6ecf17 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -507,9 +508,7 @@ private static RoutingTable routingTableMock( BoltServerAddress... routers ) private static RoutingTable routingTableMock( boolean preferInitialRouter, BoltServerAddress... routers ) { RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet addressSet = new AddressSet(); - addressSet.retainAllAndAdd( asOrderedSet( routers ) ); - when( routingTable.routers() ).thenReturn( addressSet ); + when( routingTable.routers() ).thenReturn( Arrays.asList( routers ) ); when( routingTable.database() ).thenReturn( defaultDatabase() ); when( routingTable.preferInitialRouter() ).thenReturn( preferInitialRouter ); return routingTable; diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java index b725731639..702ce41774 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -259,8 +260,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.isStaleFor( mode ) ).thenReturn( true ); - AddressSet addresses = new AddressSet(); - addresses.retainAllAndAdd( new HashSet<>( singletonList( LOCAL_DEFAULT ) ) ); + List addresses = singletonList( LOCAL_DEFAULT ); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); when( routingTable.database() ).thenReturn( defaultDatabase() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java index c2592e4a51..856ae881a0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java @@ -22,10 +22,13 @@ import org.junit.jupiter.api.Test; import org.mockito.Mock; -import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.spi.ConnectionPool; +import java.util.Arrays; +import java.util.Collections; + import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.spi.ConnectionPool; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -54,15 +57,15 @@ void setUp() } @Test - void shouldHandleEmptyReadersArray() + void shouldHandleEmptyReaders() { - assertNull( strategy.selectReader( new BoltServerAddress[0] ) ); + assertNull( strategy.selectReader( Collections.emptyList() ) ); } @Test - void shouldHandleEmptyWritersArray() + void shouldHandleEmptyWriters() { - assertNull( strategy.selectWriter( new BoltServerAddress[0] ) ); + assertNull( strategy.selectWriter( Collections.emptyList() ) ); } @Test @@ -70,7 +73,7 @@ void shouldHandleSingleReaderWithoutActiveConnections() { BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); - assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectReader( Collections.singletonList( address ) ) ); } @Test @@ -78,7 +81,7 @@ void shouldHandleSingleWriterWithoutActiveConnections() { BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); - assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectWriter( Collections.singletonList( address ) ) ); } @Test @@ -87,7 +90,7 @@ void shouldHandleSingleReaderWithActiveConnections() BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); when( connectionPool.inUseConnections( address ) ).thenReturn( 42 ); - assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectReader( Collections.singletonList( address ) ) ); } @Test @@ -96,7 +99,7 @@ void shouldHandleSingleWriterWithActiveConnections() BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); when( connectionPool.inUseConnections( address ) ).thenReturn( 24 ); - assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectWriter( Collections.singletonList( address ) ) ); } @Test @@ -110,7 +113,7 @@ void shouldHandleMultipleReadersWithActiveConnections() when( connectionPool.inUseConnections( address2 ) ).thenReturn( 4 ); when( connectionPool.inUseConnections( address3 ) ).thenReturn( 1 ); - assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); + assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); } @Test @@ -127,7 +130,7 @@ void shouldHandleMultipleWritersWithActiveConnections() when( connectionPool.inUseConnections( address4 ) ).thenReturn( 1 ); assertEquals( address3, - strategy.selectWriter( new BoltServerAddress[]{address1, address2, address3, address4} ) ); + strategy.selectWriter( Arrays.asList( address1, address2, address3, address4 ) ) ); } @Test @@ -137,13 +140,13 @@ void shouldReturnDifferentReaderOnEveryInvocationWhenNoActiveConnections() BoltServerAddress address2 = new BoltServerAddress( "reader", 2 ); BoltServerAddress address3 = new BoltServerAddress( "reader", 3 ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); } @Test @@ -152,11 +155,11 @@ void shouldReturnDifferentWriterOnEveryInvocationWhenNoActiveConnections() BoltServerAddress address1 = new BoltServerAddress( "writer", 1 ); BoltServerAddress address2 = new BoltServerAddress( "writer", 2 ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); } @Test @@ -168,8 +171,8 @@ void shouldTraceLogWhenNoAddressSelected() LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging ); - strategy.selectReader( new BoltServerAddress[0] ); - strategy.selectWriter( new BoltServerAddress[0] ); + strategy.selectReader( Collections.emptyList() ); + strategy.selectWriter( Collections.emptyList() ); verify( logger ).trace( startsWith( "Unable to select" ), eq( "reader" ) ); verify( logger ).trace( startsWith( "Unable to select" ), eq( "writer" ) ); @@ -186,8 +189,8 @@ void shouldTraceLogSelectedAddress() LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging ); - strategy.selectReader( new BoltServerAddress[]{A} ); - strategy.selectWriter( new BoltServerAddress[]{A} ); + strategy.selectReader( Collections.singletonList( A ) ); + strategy.selectWriter( Collections.singletonList( A ) ); verify( logger ).trace( startsWith( "Selected" ), eq( "reader" ), eq( A ), eq( 42 ) ); verify( logger ).trace( startsWith( "Selected" ), eq( "writer" ), eq( A ), eq( 42 ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index 520c81946c..9c530ec282 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -25,8 +25,10 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -39,7 +41,6 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.ConnectionContext; import org.neo4j.driver.internal.async.connection.RoutingConnection; -import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.ClusterComposition; import org.neo4j.driver.internal.cluster.ClusterRoutingTable; import org.neo4j.driver.internal.cluster.Rediscovery; @@ -96,10 +97,8 @@ void returnsCorrectAccessMode( AccessMode mode ) { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - AddressSet writerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} ); - when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{B} ); + List readerAddresses = Collections.singletonList( A ); + List writerAddresses = Collections.singletonList( B ); when( routingTable.readers() ).thenReturn( readerAddresses ); when( routingTable.writers() ).thenReturn( writerAddresses ); @@ -117,8 +116,7 @@ void returnsCorrectDatabaseName( String databaseName ) { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet writerAddresses = mock( AddressSet.class ); - when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} ); + List writerAddresses = Collections.singletonList( A ); when( routingTable.writers() ).thenReturn( writerAddresses ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); @@ -135,15 +133,17 @@ void shouldThrowWhenRediscoveryReturnsNoSuitableServers() { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - when( routingTable.readers() ).thenReturn( new AddressSet() ); - when( routingTable.writers() ).thenReturn( new AddressSet() ); + when( routingTable.readers() ).thenReturn( Collections.emptyList() ); + when( routingTable.writers() ).thenReturn( Collections.emptyList() ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); - SessionExpiredException error1 = assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( READ ) ) ) ); + SessionExpiredException error1 = + assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( READ ) ) ) ); assertThat( error1.getMessage(), startsWith( "Failed to obtain connection towards READ server" ) ); - SessionExpiredException error2 = assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( WRITE ) ) ) ); + SessionExpiredException error2 = + assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( WRITE ) ) ) ); assertThat( error2.getMessage(), startsWith( "Failed to obtain connection towards WRITE server" ) ); } @@ -157,8 +157,7 @@ void shouldSelectLeastConnectedAddress() when( connectionPool.inUseConnections( C ) ).thenReturn( 0 ); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A, B, C} ); + List readerAddresses = Arrays.asList( A, B, C ); when( routingTable.readers() ).thenReturn( readerAddresses ); @@ -182,8 +181,7 @@ void shouldRoundRobinWhenNoActiveConnections() ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A, B, C} ); + List readerAddresses = Arrays.asList( A, B, C ); when( routingTable.readers() ).thenReturn( readerAddresses ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java index 75e85df9e4..33c3897f42 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -32,8 +32,6 @@ import org.neo4j.driver.internal.InternalDriver; import org.neo4j.driver.internal.SessionFactory; import org.neo4j.driver.internal.SessionFactoryImpl; -import org.neo4j.driver.internal.cluster.AddressSet; -import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.summary.ResultSummary; @@ -44,69 +42,6 @@ private Matchers() { } - public static Matcher containsRouter( final BoltServerAddress address ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( RoutingTable routingTable ) - { - BoltServerAddress[] addresses = routingTable.routers().toArray(); - - for ( BoltServerAddress currentAddress : addresses ) - { - if ( currentAddress.equals( address ) ) - { - return true; - } - } - return false; - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "routing table that contains router " ).appendValue( address ); - } - }; - } - - public static Matcher containsReader( final BoltServerAddress address ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( RoutingTable routingTable ) - { - return contains( routingTable.readers(), address ); - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "routing table that contains reader " ).appendValue( address ); - } - }; - } - - public static Matcher containsWriter( final BoltServerAddress address ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( RoutingTable routingTable ) - { - return contains( routingTable.writers(), address ); - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "routing table that contains writer " ).appendValue( address ); - } - }; - } - public static Matcher directDriver() { return new TypeSafeMatcher() @@ -273,19 +208,6 @@ public void describeTo( Description description ) }; } - private static boolean contains( AddressSet set, BoltServerAddress address ) - { - BoltServerAddress[] addresses = set.toArray(); - for ( BoltServerAddress currentAddress : addresses ) - { - if ( currentAddress.equals( address ) ) - { - return true; - } - } - return false; - } - private static boolean hasConnectionProvider( Driver driver, Class providerClass ) { return extractConnectionProvider( driver, providerClass ) != null; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java index 83b97b85be..bd5d0352ea 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -51,13 +51,13 @@ public TestkitRequestProcessorHandler( BackendMode backendMode ) switch ( backendMode ) { case ASYNC: - processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest; + processorImpl = TestkitRequest::processAsync; break; case REACTIVE: processorImpl = ( request, state ) -> request.processRx( state ).toFuture(); break; default: - processorImpl = TestkitRequest::processAsync; + processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest; break; } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 41d2bf0d66..0be682d5e3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -25,7 +25,6 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -35,7 +34,6 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.DatabaseNameUtil; -import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.RoutingTableHandler; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; @@ -43,6 +41,11 @@ @Getter public class GetRoutingTable implements TestkitRequest { + private static final Function,List> ADDRESSES_TO_STRINGS = + ( addresses ) -> addresses.stream() + .map( address -> String.format( "%s:%d", address.host(), address.port() ) ) + .collect( Collectors.toList() ); + private GetRoutingTableBody data; @Override @@ -61,17 +64,15 @@ public TestkitResponse process( TestkitState testkitState ) String.format( "There is no routing table handler for the '%s' database.", databaseName.databaseName().orElse( "null" ) ) ) ); org.neo4j.driver.internal.cluster.RoutingTable routingTable = routingTableHandler.routingTable(); - Function> addressesToStrings = ( addresses ) -> Arrays.stream( addresses.toArray() ) - .map( BoltServerAddress::toString ).collect( Collectors.toList() ); return RoutingTable .builder() .data( RoutingTable.RoutingTableBody .builder() .database( databaseName.databaseName().orElse( null ) ) - .routers( addressesToStrings.apply( routingTable.routers() ) ) - .readers( addressesToStrings.apply( routingTable.readers() ) ) - .writers( addressesToStrings.apply( routingTable.writers() ) ) + .routers( ADDRESSES_TO_STRINGS.apply( routingTable.routers() ) ) + .readers( ADDRESSES_TO_STRINGS.apply( routingTable.readers() ) ) + .writers( ADDRESSES_TO_STRINGS.apply( routingTable.writers() ) ) .build() ).build(); }