diff --git a/README.md b/README.md index 215f9f9b78..b951227d48 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,9 @@ Make sure to run build for the whole project and not just for `testkit-tests` mo - `mvn clean verify -DtestkitArgs='--tests STUB_TESTS'` - runs all project tests and Testkit stub tests. - `mvn clean verify -DskipTests -P testkit-tests` - skips all project tests and runs Testkit tests. - `mvn clean verify -DskipTests -DtestkitArgs='--tests STUB_TESTS'` - skips all project tests and runs Testkit stub tests. +- `mvn clean verify -DskipITs -DtestkitArgs='--tests STUB_TESTS'` - skips all integration tests and runs Testkit stub tests. + +If you interrupt Maven build, you have to remove Testkit containers manually. ##### Running Testkit manually diff --git a/driver/src/main/java/org/neo4j/driver/exceptions/ConnectionReadTimeoutException.java b/driver/src/main/java/org/neo4j/driver/exceptions/ConnectionReadTimeoutException.java new file mode 100644 index 0000000000..4ed5a66383 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/exceptions/ConnectionReadTimeoutException.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.exceptions; + +/** + * Indicates that read timed out due to it taking longer than the server-supplied timeout value via the {@code connection.recv_timeout_seconds} configuration + * hint. The server might provide this value to clients to let them know when a given connection may be considered broken if client does not get any + * communication from the server within the specified timeout period. This results in the server being removed from the routing table. + */ +public class ConnectionReadTimeoutException extends ServiceUnavailableException +{ + public static final ConnectionReadTimeoutException INSTANCE = new ConnectionReadTimeoutException( + "Connection read timed out due to it taking longer than the server-supplied timeout value via configuration hint." ); + + public ConnectionReadTimeoutException( String message ) + { + super( message ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 7c8f6bbcc7..fe7e31a631 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -207,12 +207,25 @@ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory * This method is protected only for testing */ protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, - EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings ) + EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings ) { LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() ); ServerAddressResolver resolver = createResolver( config ); - return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(), - config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() ); + LoadBalancer loadBalancer = new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(), + config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() ); + handleNewLoadBalancer( loadBalancer ); + return loadBalancer; + } + + /** + * Handles new {@link LoadBalancer} instance. + *

+ * This method is protected for Testkit backend usage only. + * + * @param loadBalancer the new load balancer instance. + */ + protected void handleNewLoadBalancer( LoadBalancer loadBalancer ) + { } private static ServerAddressResolver createResolver( Config config ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java index 6bbbd3fd33..8b6f6c8fbc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java @@ -19,13 +19,18 @@ package org.neo4j.driver.internal.async; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.neo4j.driver.Logger; +import org.neo4j.driver.Logging; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.connection.ChannelAttributes; +import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.async.pool.ExtendedChannelPool; import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler; @@ -45,13 +50,12 @@ import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason; /** - * This connection represents a simple network connection to a remote server. - * It wraps a channel obtained from a connection pool. - * The life cycle of this connection start from the moment the channel is borrowed out of the pool - * and end at the time the connection is released back to the pool. + * This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this + * connection start from the moment the channel is borrowed out of the pool and end at the time the connection is released back to the pool. */ public class NetworkConnection implements Connection { + private final Logger log; private final Channel channel; private final InboundMessageDispatcher messageDispatcher; private final String serverAgent; @@ -66,8 +70,12 @@ public class NetworkConnection implements Connection private final MetricsListener metricsListener; private final ListenerEvent inUseEvent; - public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener ) + private final Long connectionReadTimeout; + private ChannelHandler connectionReadTimeoutHandler; + + public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener, Logging logging ) { + this.log = logging.getLog( this.getClass().getCanonicalName() ); this.channel = channel; this.messageDispatcher = ChannelAttributes.messageDispatcher( channel ); this.serverAgent = ChannelAttributes.serverAgent( channel ); @@ -79,6 +87,7 @@ public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Cloc this.clock = clock; this.metricsListener = metricsListener; this.inUseEvent = metricsListener.createListenerEvent(); + this.connectionReadTimeout = ChannelAttributes.connectionReadTimeout( channel ).orElse( null ); metricsListener.afterConnectionCreated( poolId( this.channel ), this.inUseEvent ); } @@ -225,14 +234,19 @@ private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean is setAutoRead( true ); messageDispatcher.enqueue( resetHandler ); - channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() ); + channel.writeAndFlush( ResetMessage.RESET ).addListener( future -> registerConnectionReadTimeout( channel ) ); } } ); } private void flushInEventLoop() { - channel.eventLoop().execute( channel::flush ); + channel.eventLoop().execute( + () -> + { + channel.flush(); + registerConnectionReadTimeout( channel ); + } ); } private void writeMessageInEventLoop( Message message, ResponseHandler handler, boolean flush ) @@ -243,7 +257,7 @@ private void writeMessageInEventLoop( Message message, ResponseHandler handler, if ( flush ) { - channel.writeAndFlush( message, channel.voidPromise() ); + channel.writeAndFlush( message ).addListener( future -> registerConnectionReadTimeout( channel ) ); } else { @@ -263,7 +277,7 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler if ( flush ) { - channel.writeAndFlush( message2, channel.voidPromise() ); + channel.writeAndFlush( message2 ).addListener( future -> registerConnectionReadTimeout( channel ) ); } else { @@ -311,6 +325,29 @@ private boolean verifyOpen( ResponseHandler handler1, ResponseHandler handler2 ) } } + private void registerConnectionReadTimeout( Channel channel ) + { + if ( !channel.eventLoop().inEventLoop() ) + { + throw new IllegalStateException( "This method may only be called in the EventLoop" ); + } + + if ( connectionReadTimeout != null && connectionReadTimeoutHandler == null ) + { + connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler( connectionReadTimeout, TimeUnit.SECONDS ); + channel.pipeline().addFirst( connectionReadTimeoutHandler ); + log.debug( "Added ConnectionReadTimeoutHandler" ); + messageDispatcher.setBeforeLastHandlerHook( + ( messageType ) -> + { + channel.pipeline().remove( connectionReadTimeoutHandler ); + connectionReadTimeoutHandler = null; + messageDispatcher.setBeforeLastHandlerHook( null ); + log.debug( "Removed ConnectionReadTimeoutHandler" ); + } ); + } + } + private enum Status { OPEN, diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 14bb02828c..a46b4d628a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -31,6 +31,7 @@ import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.BookmarkHolder; import org.neo4j.driver.internal.cursor.AsyncResultCursor; import org.neo4j.driver.internal.cursor.RxResultCursor; @@ -147,6 +148,10 @@ public CompletionStage beginAsync( Bookmark initialBookmar { connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION ); } + else if ( beginError instanceof ConnectionReadTimeoutException ) + { + connection.terminateAndRelease( beginError.getMessage() ); + } else { connection.release(); @@ -325,6 +330,10 @@ private void handleTransactionCompletion( boolean commitOnSuccess, Throwable thr { connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION ); } + else if ( throwable instanceof ConnectionReadTimeoutException ) + { + connection.terminateAndRelease( throwable.getMessage() ); + } else { connection.release(); // release in background diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java index a8773211ef..f3a8eefc95 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java @@ -21,6 +21,8 @@ import io.netty.channel.Channel; import io.netty.util.AttributeKey; +import java.util.Optional; + import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; @@ -42,6 +44,9 @@ public final class ChannelAttributes private static final AttributeKey TERMINATION_REASON = newInstance( "terminationReason" ); private static final AttributeKey AUTHORIZATION_STATE_LISTENER = newInstance( "authorizationStateListener" ); + // configuration hints provided by the server + private static final AttributeKey CONNECTION_READ_TIMEOUT = newInstance( "connectionReadTimeout" ); + private ChannelAttributes() { } @@ -156,6 +161,16 @@ public static void setAuthorizationStateListener( Channel channel, Authorization set( channel, AUTHORIZATION_STATE_LISTENER, authorizationStateListener ); } + public static Optional connectionReadTimeout( Channel channel ) + { + return Optional.ofNullable( get( channel, CONNECTION_READ_TIMEOUT ) ); + } + + public static void setConnectionReadTimeout( Channel channel, Long connectionReadTimeout ) + { + setOnce( channel, CONNECTION_READ_TIMEOUT, connectionReadTimeout ); + } + private static T get( Channel channel, AttributeKey key ) { return channel.attr( key ).get(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index 015350e319..311da7a0f0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -24,11 +24,12 @@ import java.io.IOException; -import org.neo4j.driver.internal.logging.ChannelActivityLogger; -import org.neo4j.driver.internal.util.ErrorUtil; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.exceptions.ServiceUnavailableException; +import org.neo4j.driver.internal.logging.ChannelActivityLogger; +import org.neo4j.driver.internal.util.ErrorUtil; import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher; @@ -94,11 +95,19 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error ) else { failed = true; - log.warn( "Fatal error occurred in the pipeline", error ); + logUnexpectedErrorWarning( error ); fail( error ); } } + private void logUnexpectedErrorWarning( Throwable error ) + { + if ( !(error instanceof ConnectionReadTimeoutException) ) + { + log.warn( "Fatal error occurred in the pipeline", error ); + } + } + private void fail( Throwable error ) { Throwable cause = transformError( error ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandler.java new file mode 100644 index 0000000000..81ece4335a --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.inbound; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.ReadTimeoutHandler; + +import java.util.concurrent.TimeUnit; + +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; + +public class ConnectionReadTimeoutHandler extends ReadTimeoutHandler +{ + private boolean triggered; + + public ConnectionReadTimeoutHandler( long timeout, TimeUnit unit ) + { + super( timeout, unit ); + } + + @Override + protected void readTimedOut( ChannelHandlerContext ctx ) + { + if ( !triggered ) + { + ctx.fireExceptionCaught( ConnectionReadTimeoutException.INSTANCE ); + ctx.close(); + triggered = true; + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 881c445291..c1f08de21c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -50,6 +50,7 @@ public class InboundMessageDispatcher implements ResponseMessageHandler private volatile boolean gracefullyClosed; private Throwable currentError; private boolean fatalErrorOccurred; + private HandlerHook beforeLastHandlerHook; private ResponseHandler autoReadManagingHandler; @@ -72,6 +73,15 @@ public void enqueue( ResponseHandler handler ) } } + public void setBeforeLastHandlerHook( HandlerHook beforeLastHandlerHook ) + { + if ( !channel.eventLoop().inEventLoop() ) + { + throw new IllegalStateException( "This method may only be called in the EventLoop" ); + } + this.beforeLastHandlerHook = beforeLastHandlerHook; + } + public int queuedHandlersCount() { return handlers.size(); @@ -81,6 +91,7 @@ public int queuedHandlersCount() public void handleSuccessMessage( Map meta ) { log.debug( "S: SUCCESS %s", meta ); + invokeBeforeLastHandlerHook( HandlerHook.MessageType.SUCCESS ); ResponseHandler handler = removeHandler(); handler.onSuccess( meta ); } @@ -127,6 +138,7 @@ public void handleFailureMessage( String code, String message ) channel.writeAndFlush( RESET, channel.voidPromise() ); } + invokeBeforeLastHandlerHook( HandlerHook.MessageType.FAILURE ); ResponseHandler handler = removeHandler(); handler.onFailure( currentError ); } @@ -250,4 +262,23 @@ private void updateAutoReadManagingHandler( ResponseHandler newHandler ) } autoReadManagingHandler = newHandler; } + + private void invokeBeforeLastHandlerHook( HandlerHook.MessageType messageType ) + { + if ( handlers.size() == 1 && beforeLastHandlerHook != null ) + { + beforeLastHandlerHook.run( messageType ); + } + } + + public interface HandlerHook + { + enum MessageType + { + SUCCESS, + FAILURE + } + + void run( MessageType messageType ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java index d8def041cb..479d5bdb08 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java @@ -71,7 +71,7 @@ public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, Pool { this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ), new NettyChannelHealthChecker( settings, clock, logging ), settings, metricsListener, logging, - clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener ) ); + clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener, logging ) ); } protected ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker, diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java index db05a3bf97..1a3404e950 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; +import org.neo4j.driver.Logging; import org.neo4j.driver.internal.async.NetworkConnection; import org.neo4j.driver.internal.metrics.MetricsListener; import org.neo4j.driver.internal.spi.Connection; @@ -29,16 +30,18 @@ public class NetworkConnectionFactory implements ConnectionFactory { private final Clock clock; private final MetricsListener metricsListener; + private final Logging logging; - public NetworkConnectionFactory( Clock clock, MetricsListener metricsListener ) + public NetworkConnectionFactory( Clock clock, MetricsListener metricsListener, Logging logging ) { this.clock = clock; this.metricsListener = metricsListener; + this.logging = logging; } @Override public Connection createConnection( Channel channel, ExtendedChannelPool pool ) { - return new NetworkConnection( channel, pool, clock, metricsListener ); + return new NetworkConnection( channel, pool, clock, metricsListener, logging ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java index abe833639f..f023f0e1c0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -52,4 +53,12 @@ public interface RoutingTableRegistry * Removes all routing tables that has been not used for a long time. */ void removeAged(); + + /** + * Returns routing table handler for the given database name if it exists in the registry. + * + * @param databaseName the database name + * @return the routing table handler for the requested database name + */ + Optional getRoutingTableHandler( DatabaseName databaseName ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java index 9c3ba3bceb..018dcb6586 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.cluster; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -79,14 +80,23 @@ public void remove( DatabaseName databaseName ) @Override public void removeAged() { - routingTableHandlers.forEach( ( databaseName, handler ) -> { - if ( handler.isRoutingTableAged() ) - { - logger.info( "Routing table handler for database '%s' is removed because it has not been used for a long time. Routing table: %s", - databaseName.description(), handler.routingTable() ); - routingTableHandlers.remove( databaseName ); - } - } ); + routingTableHandlers.forEach( + ( databaseName, handler ) -> + { + if ( handler.isRoutingTableAged() ) + { + logger.info( + "Routing table handler for database '%s' is removed because it has not been used for a long time. Routing table: %s", + databaseName.description(), handler.routingTable() ); + routingTableHandlers.remove( databaseName ); + } + } ); + } + + @Override + public Optional getRoutingTableHandler( DatabaseName databaseName ) + { + return Optional.ofNullable( routingTableHandlers.get( databaseName ) ); } // For tests @@ -97,11 +107,13 @@ public boolean contains( DatabaseName databaseName ) private RoutingTableHandler getOrCreate( DatabaseName databaseName ) { - return routingTableHandlers.computeIfAbsent( databaseName, name -> { - RoutingTableHandler handler = factory.newInstance( name, this ); - logger.debug( "Routing table handler for database '%s' is added.", databaseName.description() ); - return handler; - } ); + return routingTableHandlers.computeIfAbsent( + databaseName, name -> + { + RoutingTableHandler handler = factory.newInstance( name, this ); + logger.debug( "Routing table handler for database '%s' is added.", databaseName.description() ); + return handler; + } ); } static class RoutingTableHandlerFactory diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 7174fdf166..16a741efb8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -175,12 +175,19 @@ public CompletionStage supportsMultiDb() } ); } + public RoutingTableRegistry getRoutingTableRegistry() + { + return routingTables; + } + private CompletionStage supportsMultiDb( BoltServerAddress address ) { - return connectionPool.acquire( address ).thenCompose( conn -> { - boolean supportsMultiDatabase = supportsMultiDatabase( conn ); - return conn.release().thenApply( ignored -> supportsMultiDatabase ); - } ); + return connectionPool.acquire( address ).thenCompose( + conn -> + { + boolean supportsMultiDatabase = supportsMultiDatabase( conn ); + return conn.release().thenApply( ignored -> supportsMultiDatabase ); + } ); } private CompletionStage acquire( AccessMode mode, RoutingTable routingTable ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java index 95c97849f7..af028cef16 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java @@ -22,6 +22,8 @@ import io.netty.channel.ChannelPromise; import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; import org.neo4j.driver.Value; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; @@ -29,6 +31,7 @@ import org.neo4j.driver.internal.spi.ResponseHandler; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId; +import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerAgent; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerVersion; import static org.neo4j.driver.internal.util.MetadataExtractor.extractNeo4jServerVersion; @@ -38,6 +41,8 @@ public class HelloResponseHandler implements ResponseHandler { private static final String CONNECTION_ID_METADATA_KEY = "connection_id"; + public static final String CONFIGURATION_HINTS_KEY = "hints"; + public static final String CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY = "connection.recv_timeout_seconds"; private final ChannelPromise connectionInitializedPromise; private final Channel channel; @@ -72,6 +77,8 @@ public void onSuccess( Map metadata ) String connectionId = extractConnectionId( metadata ); setConnectionId( channel, connectionId ); + processConfigurationHints( metadata ); + connectionInitializedPromise.setSuccess(); } catch ( Throwable error ) @@ -103,4 +110,26 @@ private static String extractConnectionId( Map metadata ) } return value.asString(); } + + private void processConfigurationHints( Map metadata ) + { + Value configurationHints = metadata.get( CONFIGURATION_HINTS_KEY ); + if ( configurationHints != null ) + { + getFromSupplierOrEmptyOnException( () -> configurationHints.get( CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY ).asLong() ) + .ifPresent( timeout -> setConnectionReadTimeout( channel, timeout ) ); + } + } + + private static Optional getFromSupplierOrEmptyOnException( Supplier supplier ) + { + try + { + return Optional.of( supplier.get() ); + } + catch ( Exception e ) + { + return Optional.empty(); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java index 8a3d7afd59..02d2f6a898 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java @@ -21,16 +21,16 @@ import java.util.Map; import java.util.Objects; -import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.RoutingErrorHandler; -import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.RoutingErrorHandler; +import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.Futures; import static java.lang.String.format; diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java index edc61123d0..31a6a69e5c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java @@ -23,6 +23,7 @@ import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.AuthorizationExpiredException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; @@ -70,6 +71,10 @@ else if ( error instanceof AuthorizationExpiredException ) { connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION ); } + else if ( error instanceof ConnectionReadTimeoutException ) + { + connection.terminateAndRelease( error.getMessage() ); + } runFuture.completeExceptionally( error ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java index 73b9473829..5ff2d63dc0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java @@ -22,6 +22,7 @@ import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.AuthorizationExpiredException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.BookmarkHolder; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.MetadataExtractor; @@ -53,6 +54,10 @@ public void afterFailure( Throwable error ) { connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION ); } + else if ( error instanceof ConnectionReadTimeoutException ) + { + connection.terminateAndRelease( error.getMessage() ); + } else { releaseConnection(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java index d118fb6dcf..ff50cf5234 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java @@ -641,7 +641,7 @@ private static NetworkConnection newConnection( Channel channel ) private static NetworkConnection newConnection( Channel channel, ExtendedChannelPool pool ) { - return new NetworkConnection( channel, pool, new FakeClock(), DEV_NULL_METRICS ); + return new NetworkConnection( channel, pool, new FakeClock(), DEV_NULL_METRICS, DEV_NULL_LOGGING ); } private static void assertConnectionReleasedError( IllegalStateException e ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index ef1846adf6..b8909e176f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -26,7 +26,9 @@ import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.DefaultBookmarkHolder; import org.neo4j.driver.internal.FailableCursor; import org.neo4j.driver.internal.InternalBookmark; @@ -38,6 +40,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -276,12 +279,44 @@ void shouldReleaseConnectionWhenClose() throws Throwable verify( connection ).release(); } - private static UnmanagedTransaction beginTx(Connection connection ) + @Test + void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure() + { + AuthorizationExpiredException exception = new AuthorizationExpiredException( "code", "message" ); + Connection connection = connectionWithBegin( handler -> handler.onFailure( exception ) ); + UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); + Bookmark bookmark = InternalBookmark.parse( "SomeBookmark" ); + TransactionConfig txConfig = TransactionConfig.empty(); + + AuthorizationExpiredException actualException = assertThrows( AuthorizationExpiredException.class, () -> await( tx.beginAsync( bookmark, txConfig ) ) ); + + assertSame( exception, actualException ); + verify( connection ).terminateAndRelease( AuthorizationExpiredException.DESCRIPTION ); + verify( connection, never() ).release(); + } + + @Test + void shouldReleaseConnectionOnConnectionReadTimeoutExceptionFailure() + { + Connection connection = connectionWithBegin( handler -> handler.onFailure( ConnectionReadTimeoutException.INSTANCE ) ); + UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); + Bookmark bookmark = InternalBookmark.parse( "SomeBookmark" ); + TransactionConfig txConfig = TransactionConfig.empty(); + + ConnectionReadTimeoutException actualException = + assertThrows( ConnectionReadTimeoutException.class, () -> await( tx.beginAsync( bookmark, txConfig ) ) ); + + assertSame( ConnectionReadTimeoutException.INSTANCE, actualException ); + verify( connection ).terminateAndRelease( ConnectionReadTimeoutException.INSTANCE.getMessage() ); + verify( connection, never() ).release(); + } + + private static UnmanagedTransaction beginTx( Connection connection ) { return beginTx( connection, InternalBookmark.empty() ); } - private static UnmanagedTransaction beginTx(Connection connection, Bookmark initialBookmark ) + private static UnmanagedTransaction beginTx( Connection connection, Bookmark initialBookmark ) { UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); return await( tx.beginAsync( initialBookmark, TransactionConfig.empty() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java index d3d294ead2..fef5808d8a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.mock; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authorizationStateListener; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionId; +import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionReadTimeout; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.creationTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.lastUsedTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher; @@ -41,6 +42,7 @@ import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverVersion; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId; +import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher; @@ -218,4 +220,20 @@ void shouldAllowOverridingAuthorizationStateListener() setAuthorizationStateListener( channel, newListener ); assertEquals( newListener, authorizationStateListener( channel ) ); } + + @Test + void shouldSetAndGetConnectionReadTimeout() + { + long timeout = 15L; + setConnectionReadTimeout( channel, timeout ); + assertEquals( timeout, connectionReadTimeout( channel ).orElse( null ) ); + } + + @Test + void shouldFailToSetConnectionReadTimeoutTwice() + { + long timeout = 15L; + setConnectionReadTimeout( channel, timeout ); + assertThrows( IllegalStateException.class, () -> setConnectionReadTimeout( channel, timeout ) ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandlerTest.java new file mode 100644 index 0000000000..e0aaec2488 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandlerTest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.inbound; + +import io.netty.channel.ChannelHandlerContext; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; + +import static org.mockito.BDDMockito.any; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.times; +import static org.mockito.Mockito.mock; + +public class ConnectionReadTimeoutHandlerTest +{ + ConnectionReadTimeoutHandler handler = new ConnectionReadTimeoutHandler( 15L, TimeUnit.SECONDS ); + ChannelHandlerContext context = mock( ChannelHandlerContext.class ); + + @Test + void shouldFireConnectionReadTimeoutExceptionAndCloseChannelOnReadTimeOutOnce() + { + // WHEN + IntStream.range( 0, 10 ).forEach( i -> handler.readTimedOut( context ) ); + + // THEN + then( context ).should( times( 1 ) ).fireExceptionCaught( any( ConnectionReadTimeoutException.class ) ); + then( context ).should( times( 1 ) ).close(); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java index 85cd88be53..b725731639 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -184,6 +185,12 @@ public void remove( DatabaseName databaseName ) public void removeAged() { } + + @Override + public Optional getRoutingTableHandler( DatabaseName databaseName ) + { + return Optional.empty(); + } }; RoutingTableHandler handler = newRoutingTableHandler( routingTable, rediscovery, connectionPool, registry ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java index 2fd3600b6e..bfe0f4c502 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java @@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.neo4j.driver.Values.value; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionId; +import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionReadTimeout; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAgent; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverVersion; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher; @@ -224,7 +225,68 @@ void shouldCloseChannelOnFailure() throws Exception assertEquals( error, channelPromise.cause() ); } + @Test + void shouldNotThrowWhenConfigurationHintsAreAbsent() + { + ChannelPromise channelPromise = channel.newPromise(); + HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION ); + + Map metadata = metadata( anyServerVersion(), "bolt-x" ); + handler.onSuccess( metadata ); + + assertTrue( channelPromise.isSuccess() ); + assertFalse( channel.closeFuture().isDone() ); + } + + @Test + void shouldNotThrowWhenConfigurationHintsAreEmpty() + { + ChannelPromise channelPromise = channel.newPromise(); + HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION ); + + Map metadata = metadata( anyServerVersion(), "bolt-x", value( new HashMap<>() ) ); + handler.onSuccess( metadata ); + + assertTrue( channelPromise.isSuccess() ); + assertFalse( channel.closeFuture().isDone() ); + } + + @Test + void shouldNotThrowWhenConfigurationHintsAreNull() + { + ChannelPromise channelPromise = channel.newPromise(); + HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION ); + + Map metadata = metadata( anyServerVersion(), "bolt-x", Values.NULL ); + handler.onSuccess( metadata ); + + assertTrue( channelPromise.isSuccess() ); + assertFalse( channel.closeFuture().isDone() ); + } + + @Test + void shouldSetConnectionTimeoutHint() + { + ChannelPromise channelPromise = channel.newPromise(); + HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION ); + + long timeout = 15L; + Map hints = new HashMap<>(); + hints.put( HelloResponseHandler.CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY, value( timeout ) ); + Map metadata = metadata( anyServerVersion(), "bolt-x", value( hints ) ); + handler.onSuccess( metadata ); + + assertEquals( timeout, connectionReadTimeout( channel ).orElse( null ) ); + assertTrue( channelPromise.isSuccess() ); + assertFalse( channel.closeFuture().isDone() ); + } + private static Map metadata( Object version, Object connectionId ) + { + return metadata( version, connectionId, null ); + } + + private static Map metadata( Object version, Object connectionId, Value hints ) { Map result = new HashMap<>(); @@ -249,6 +311,7 @@ else if ( version instanceof Value && ((Value) version).isNull() ) { result.put( "connection_id", value( connectionId ) ); } + result.put( HelloResponseHandler.CONFIGURATION_HINTS_KEY, hints ); return result; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java index 73cb31ce28..d417cc9db8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException; import org.neo4j.driver.exceptions.AuthorizationExpiredException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; import org.neo4j.driver.internal.spi.Connection; @@ -189,6 +190,23 @@ void shouldReleaseConnectionImmediatelyAndFailOnAuthorizationExpiredExceptionFai verify( connection, never() ).release(); } + @Test + void shouldReleaseConnectionImmediatelyAndFailOnConnectionReadTimeoutExceptionFailure() + { + CompletableFuture runFuture = new CompletableFuture<>(); + Connection connection = mock( Connection.class ); + RunResponseHandler handler = new RunResponseHandler( runFuture, BoltProtocolV3.METADATA_EXTRACTOR, connection, null ); + + assertFalse( runFuture.isDone() ); + handler.onFailure( ConnectionReadTimeoutException.INSTANCE ); + + assertTrue( runFuture.isCompletedExceptionally() ); + ConnectionReadTimeoutException actualException = assertThrows( ConnectionReadTimeoutException.class, () -> await( runFuture ) ); + assertSame( ConnectionReadTimeoutException.INSTANCE, actualException ); + verify( connection ).terminateAndRelease( ConnectionReadTimeoutException.INSTANCE.getMessage() ); + verify( connection, never() ).release(); + } + private static void testResultAvailableAfterOnSuccess( String key, MetadataExtractor metadataExtractor ) { RunResponseHandler handler = newHandler( metadataExtractor ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java index cb0cfa7579..4b2cc67f4c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java @@ -23,6 +23,8 @@ import java.util.concurrent.CompletableFuture; import org.neo4j.driver.Query; +import org.neo4j.driver.exceptions.AuthorizationExpiredException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.BookmarkHolder; import org.neo4j.driver.internal.InternalBookmark; @@ -35,6 +37,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.Values.value; @@ -80,13 +83,44 @@ void shouldUpdateBookmarksOnSuccess() verify( bookmarkHolder ).setBookmark( InternalBookmark.parse( bookmarkValue ) ); } + @Test + void shouldReleaseConnectionImmediatelyOnAuthorizationExpiredExceptionFailure() + { + Connection connection = newConnectionMock(); + PullResponseCompletionListener listener = new SessionPullResponseCompletionListener( connection, BookmarkHolder.NO_OP ); + ResponseHandler handler = newHandler( connection, listener ); + AuthorizationExpiredException exception = new AuthorizationExpiredException( "code", "message" ); + + handler.onFailure( exception ); + + verify( connection ).terminateAndRelease( AuthorizationExpiredException.DESCRIPTION ); + verify( connection, never() ).release(); + } + + @Test + void shouldReleaseConnectionImmediatelyOnConnectionReadTimeoutExceptionFailure() + { + Connection connection = newConnectionMock(); + PullResponseCompletionListener listener = new SessionPullResponseCompletionListener( connection, BookmarkHolder.NO_OP ); + ResponseHandler handler = newHandler( connection, listener ); + + handler.onFailure( ConnectionReadTimeoutException.INSTANCE ); + + verify( connection ).terminateAndRelease( ConnectionReadTimeoutException.INSTANCE.getMessage() ); + verify( connection, never() ).release(); + } + private static ResponseHandler newHandler( Connection connection, PullResponseCompletionListener listener ) { RunResponseHandler runHandler = new RunResponseHandler( new CompletableFuture<>(), BoltProtocolV3.METADATA_EXTRACTOR, mock( Connection.class ), null ); BasicPullResponseHandler handler = new BasicPullResponseHandler( new Query( "RETURN 1" ), runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, listener ); - handler.installRecordConsumer( ( record, throwable ) -> {} ); - handler.installSummaryConsumer( ( resultSummary, throwable ) -> {} ); + handler.installRecordConsumer( ( record, throwable ) -> + { + } ); + handler.installSummaryConsumer( ( resultSummary, throwable ) -> + { + } ); return handler; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index 1e49198a4c..f55dc6c160 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -45,6 +45,7 @@ import org.neo4j.driver.Logging; import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -810,6 +811,28 @@ void doesRetryOnAuthorizationExpiredException() assertEquals( "Done", result ); } + @Test + void doesRetryOnConnectionReadTimeoutException() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = logic.retry( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw ConnectionReadTimeoutException.INSTANCE; + } + return "Done"; + } ); + + assertEquals( "Done", result ); + } + @Test void doesNotRetryOnRandomClientException() { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index a13bbb7460..b42c72cfd9 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -32,12 +32,14 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.internal.cluster.RoutingTableRegistry; import org.neo4j.driver.net.ServerAddress; @Getter public class TestkitState { private final Map drivers = new HashMap<>(); + private final Map routingTableRegistry = new HashMap<>(); private final Map sessionStates = new HashMap<>(); private final Map results = new HashMap<>(); private final Map transactions = new HashMap<>(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 6a8330243c..aafea6e41c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -36,7 +36,8 @@ public class GetFeatures implements TestkitRequest { private static final Set FEATURES = new HashSet<>( Arrays.asList( "AuthorizationExpiredTreatment", - "Optimization:PullPipelining" + "Optimization:PullPipelining", + "ConfHint:connection.recv_timeout_seconds" ) ); @Override diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java new file mode 100644 index 0000000000..64457bf42f --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.requests; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.messages.responses.RoutingTable; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.DatabaseNameUtil; +import org.neo4j.driver.internal.cluster.AddressSet; +import org.neo4j.driver.internal.cluster.RoutingTableHandler; +import org.neo4j.driver.internal.cluster.RoutingTableRegistry; + +@Setter +@Getter +@NoArgsConstructor +public class GetRoutingTable implements TestkitRequest +{ + private GetRoutingTableBody data; + + @Override + public TestkitResponse process( TestkitState testkitState ) + { + RoutingTableRegistry routingTableRegistry = testkitState.getRoutingTableRegistry().get( data.getDriverId() ); + if ( routingTableRegistry == null ) + { + throw new IllegalStateException( + String.format( "There is no routing table registry for '%s' driver. (It might be a direct driver)", data.getDriverId() ) ); + } + + DatabaseName databaseName = DatabaseNameUtil.database( data.getDatabase() ); + RoutingTableHandler routingTableHandler = routingTableRegistry.getRoutingTableHandler( databaseName ).orElseThrow( + () -> new IllegalStateException( + String.format( "There is no routing table handler for the '%s' database.", databaseName.databaseName().orElse( "null" ) ) ) ); + + org.neo4j.driver.internal.cluster.RoutingTable routingTable = routingTableHandler.routingTable(); + Function> addressesToStrings = ( addresses ) -> Arrays.stream( addresses.toArray() ) + .map( BoltServerAddress::toString ).collect( Collectors.toList() ); + + return RoutingTable + .builder() + .data( RoutingTable.RoutingTableBody + .builder() + .database( databaseName.databaseName().orElse( null ) ) + .routers( addressesToStrings.apply( routingTable.routers() ) ) + .readers( addressesToStrings.apply( routingTable.readers() ) ) + .writers( addressesToStrings.apply( routingTable.writers() ) ) + .build() + ).build(); + } + + @Setter + @Getter + @NoArgsConstructor + public static class GetRoutingTableBody + { + private String driverId; + private String database; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index cdb22390a7..ec9f98a0bc 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -42,6 +42,7 @@ import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.SecuritySettings; import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.net.ServerAddressResolver; @@ -91,7 +92,7 @@ public TestkitResponse process( TestkitState testkitState ) org.neo4j.driver.Driver driver; try { - driver = driver( URI.create( data.uri ), authToken, configBuilder.build(), domainNameResolver ); + driver = driver( URI.create( data.uri ), authToken, configBuilder.build(), domainNameResolver, testkitState, id ); } catch ( RuntimeException e ) { @@ -141,14 +142,15 @@ private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState }; } - private org.neo4j.driver.Driver driver( URI uri, AuthToken authToken, Config config, DomainNameResolver domainNameResolver ) + private org.neo4j.driver.Driver driver( URI uri, AuthToken authToken, Config config, DomainNameResolver domainNameResolver, TestkitState testkitState, + String driverId ) { RoutingSettings routingSettings = RoutingSettings.DEFAULT; RetrySettings retrySettings = RetrySettings.DEFAULT; SecuritySettings.SecuritySettingsBuilder securitySettingsBuilder = new SecuritySettings.SecuritySettingsBuilder(); SecuritySettings securitySettings = securitySettingsBuilder.build(); SecurityPlan securityPlan = securitySettings.createSecurityPlan( uri.getScheme() ); - return new DriverFactoryWithDomainNameResolver( domainNameResolver ) + return new DriverFactoryWithDomainNameResolver( domainNameResolver, testkitState, driverId ) .newInstance( uri, authToken, routingSettings, retrySettings, config, securityPlan ); } @@ -183,11 +185,19 @@ public static class NewDriverBody private static class DriverFactoryWithDomainNameResolver extends DriverFactory { private final DomainNameResolver domainNameResolver; + private final TestkitState testkitState; + private final String driverId; @Override protected DomainNameResolver getDomainNameResolver() { return domainNameResolver; } + + @Override + protected void handleNewLoadBalancer( LoadBalancer loadBalancer ) + { + testkitState.getRoutingTableRegistry().put( driverId, loadBalancer.getRoutingTableRegistry() ); + } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index 3b63a2157c..0a1afdd964 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -35,7 +35,8 @@ @JsonSubTypes.Type( SessionLastBookmarks.class ), @JsonSubTypes.Type( SessionWriteTransaction.class ), @JsonSubTypes.Type( ResolverResolutionCompleted.class ), @JsonSubTypes.Type( CheckMultiDBSupport.class ), @JsonSubTypes.Type( DomainNameResolutionCompleted.class ), @JsonSubTypes.Type( StartTest.class ), - @JsonSubTypes.Type( TransactionRollback.class ), @JsonSubTypes.Type( GetFeatures.class ) + @JsonSubTypes.Type( TransactionRollback.class ), @JsonSubTypes.Type( GetFeatures.class ), + @JsonSubTypes.Type( GetRoutingTable.class ) } ) public interface TestkitRequest { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/RoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/RoutingTable.java new file mode 100644 index 0000000000..2a802ec1c3 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/RoutingTable.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.responses; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +@Setter +@Getter +@Builder +public class RoutingTable implements TestkitResponse +{ + private RoutingTableBody data; + + @Override + public String testkitName() + { + return "RoutingTable"; + } + + @Setter + @Getter + @Builder + public static class RoutingTableBody + { + private String database; + private long ttl; + private List routers; + private List readers; + private List writers; + } +}