diff --git a/README.md b/README.md
index 215f9f9b78..b951227d48 100644
--- a/README.md
+++ b/README.md
@@ -128,6 +128,9 @@ Make sure to run build for the whole project and not just for `testkit-tests` mo
- `mvn clean verify -DtestkitArgs='--tests STUB_TESTS'` - runs all project tests and Testkit stub tests.
- `mvn clean verify -DskipTests -P testkit-tests` - skips all project tests and runs Testkit tests.
- `mvn clean verify -DskipTests -DtestkitArgs='--tests STUB_TESTS'` - skips all project tests and runs Testkit stub tests.
+- `mvn clean verify -DskipITs -DtestkitArgs='--tests STUB_TESTS'` - skips all integration tests and runs Testkit stub tests.
+
+If you interrupt Maven build, you have to remove Testkit containers manually.
##### Running Testkit manually
diff --git a/driver/src/main/java/org/neo4j/driver/exceptions/ConnectionReadTimeoutException.java b/driver/src/main/java/org/neo4j/driver/exceptions/ConnectionReadTimeoutException.java
new file mode 100644
index 0000000000..4ed5a66383
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/exceptions/ConnectionReadTimeoutException.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.exceptions;
+
+/**
+ * Indicates that read timed out due to it taking longer than the server-supplied timeout value via the {@code connection.recv_timeout_seconds} configuration
+ * hint. The server might provide this value to clients to let them know when a given connection may be considered broken if client does not get any
+ * communication from the server within the specified timeout period. This results in the server being removed from the routing table.
+ */
+public class ConnectionReadTimeoutException extends ServiceUnavailableException
+{
+ public static final ConnectionReadTimeoutException INSTANCE = new ConnectionReadTimeoutException(
+ "Connection read timed out due to it taking longer than the server-supplied timeout value via configuration hint." );
+
+ public ConnectionReadTimeoutException( String message )
+ {
+ super( message );
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index 7c8f6bbcc7..fe7e31a631 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -207,12 +207,25 @@ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory
* This method is protected only for testing
*/
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
- EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
+ EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
{
LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
ServerAddressResolver resolver = createResolver( config );
- return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
- config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() );
+ LoadBalancer loadBalancer = new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
+ config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() );
+ handleNewLoadBalancer( loadBalancer );
+ return loadBalancer;
+ }
+
+ /**
+ * Handles new {@link LoadBalancer} instance.
+ *
+ * This method is protected for Testkit backend usage only.
+ *
+ * @param loadBalancer the new load balancer instance.
+ */
+ protected void handleNewLoadBalancer( LoadBalancer loadBalancer )
+ {
}
private static ServerAddressResolver createResolver( Config config )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java
index 6bbbd3fd33..8b6f6c8fbc 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java
@@ -19,13 +19,18 @@
package org.neo4j.driver.internal.async;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.neo4j.driver.Logger;
+import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
+import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.async.pool.ExtendedChannelPool;
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
@@ -45,13 +50,12 @@
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
/**
- * This connection represents a simple network connection to a remote server.
- * It wraps a channel obtained from a connection pool.
- * The life cycle of this connection start from the moment the channel is borrowed out of the pool
- * and end at the time the connection is released back to the pool.
+ * This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this
+ * connection start from the moment the channel is borrowed out of the pool and end at the time the connection is released back to the pool.
*/
public class NetworkConnection implements Connection
{
+ private final Logger log;
private final Channel channel;
private final InboundMessageDispatcher messageDispatcher;
private final String serverAgent;
@@ -66,8 +70,12 @@ public class NetworkConnection implements Connection
private final MetricsListener metricsListener;
private final ListenerEvent inUseEvent;
- public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener )
+ private final Long connectionReadTimeout;
+ private ChannelHandler connectionReadTimeoutHandler;
+
+ public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener, Logging logging )
{
+ this.log = logging.getLog( this.getClass().getCanonicalName() );
this.channel = channel;
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
this.serverAgent = ChannelAttributes.serverAgent( channel );
@@ -79,6 +87,7 @@ public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Cloc
this.clock = clock;
this.metricsListener = metricsListener;
this.inUseEvent = metricsListener.createListenerEvent();
+ this.connectionReadTimeout = ChannelAttributes.connectionReadTimeout( channel ).orElse( null );
metricsListener.afterConnectionCreated( poolId( this.channel ), this.inUseEvent );
}
@@ -225,14 +234,19 @@ private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean is
setAutoRead( true );
messageDispatcher.enqueue( resetHandler );
- channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
+ channel.writeAndFlush( ResetMessage.RESET ).addListener( future -> registerConnectionReadTimeout( channel ) );
}
} );
}
private void flushInEventLoop()
{
- channel.eventLoop().execute( channel::flush );
+ channel.eventLoop().execute(
+ () ->
+ {
+ channel.flush();
+ registerConnectionReadTimeout( channel );
+ } );
}
private void writeMessageInEventLoop( Message message, ResponseHandler handler, boolean flush )
@@ -243,7 +257,7 @@ private void writeMessageInEventLoop( Message message, ResponseHandler handler,
if ( flush )
{
- channel.writeAndFlush( message, channel.voidPromise() );
+ channel.writeAndFlush( message ).addListener( future -> registerConnectionReadTimeout( channel ) );
}
else
{
@@ -263,7 +277,7 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
if ( flush )
{
- channel.writeAndFlush( message2, channel.voidPromise() );
+ channel.writeAndFlush( message2 ).addListener( future -> registerConnectionReadTimeout( channel ) );
}
else
{
@@ -311,6 +325,29 @@ private boolean verifyOpen( ResponseHandler handler1, ResponseHandler handler2 )
}
}
+ private void registerConnectionReadTimeout( Channel channel )
+ {
+ if ( !channel.eventLoop().inEventLoop() )
+ {
+ throw new IllegalStateException( "This method may only be called in the EventLoop" );
+ }
+
+ if ( connectionReadTimeout != null && connectionReadTimeoutHandler == null )
+ {
+ connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler( connectionReadTimeout, TimeUnit.SECONDS );
+ channel.pipeline().addFirst( connectionReadTimeoutHandler );
+ log.debug( "Added ConnectionReadTimeoutHandler" );
+ messageDispatcher.setBeforeLastHandlerHook(
+ ( messageType ) ->
+ {
+ channel.pipeline().remove( connectionReadTimeoutHandler );
+ connectionReadTimeoutHandler = null;
+ messageDispatcher.setBeforeLastHandlerHook( null );
+ log.debug( "Removed ConnectionReadTimeoutHandler" );
+ } );
+ }
+ }
+
private enum Status
{
OPEN,
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
index 14bb02828c..a46b4d628a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
@@ -31,6 +31,7 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursor;
@@ -147,6 +148,10 @@ public CompletionStage beginAsync( Bookmark initialBookmar
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
+ else if ( beginError instanceof ConnectionReadTimeoutException )
+ {
+ connection.terminateAndRelease( beginError.getMessage() );
+ }
else
{
connection.release();
@@ -325,6 +330,10 @@ private void handleTransactionCompletion( boolean commitOnSuccess, Throwable thr
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
+ else if ( throwable instanceof ConnectionReadTimeoutException )
+ {
+ connection.terminateAndRelease( throwable.getMessage() );
+ }
else
{
connection.release(); // release in background
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java
index a8773211ef..f3a8eefc95 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java
@@ -21,6 +21,8 @@
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
+import java.util.Optional;
+
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
@@ -42,6 +44,9 @@ public final class ChannelAttributes
private static final AttributeKey TERMINATION_REASON = newInstance( "terminationReason" );
private static final AttributeKey AUTHORIZATION_STATE_LISTENER = newInstance( "authorizationStateListener" );
+ // configuration hints provided by the server
+ private static final AttributeKey CONNECTION_READ_TIMEOUT = newInstance( "connectionReadTimeout" );
+
private ChannelAttributes()
{
}
@@ -156,6 +161,16 @@ public static void setAuthorizationStateListener( Channel channel, Authorization
set( channel, AUTHORIZATION_STATE_LISTENER, authorizationStateListener );
}
+ public static Optional connectionReadTimeout( Channel channel )
+ {
+ return Optional.ofNullable( get( channel, CONNECTION_READ_TIMEOUT ) );
+ }
+
+ public static void setConnectionReadTimeout( Channel channel, Long connectionReadTimeout )
+ {
+ setOnce( channel, CONNECTION_READ_TIMEOUT, connectionReadTimeout );
+ }
+
private static T get( Channel channel, AttributeKey key )
{
return channel.attr( key ).get();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java
index 015350e319..311da7a0f0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java
@@ -24,11 +24,12 @@
import java.io.IOException;
-import org.neo4j.driver.internal.logging.ChannelActivityLogger;
-import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
+import org.neo4j.driver.internal.logging.ChannelActivityLogger;
+import org.neo4j.driver.internal.util.ErrorUtil;
import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
@@ -94,11 +95,19 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
else
{
failed = true;
- log.warn( "Fatal error occurred in the pipeline", error );
+ logUnexpectedErrorWarning( error );
fail( error );
}
}
+ private void logUnexpectedErrorWarning( Throwable error )
+ {
+ if ( !(error instanceof ConnectionReadTimeoutException) )
+ {
+ log.warn( "Fatal error occurred in the pipeline", error );
+ }
+ }
+
private void fail( Throwable error )
{
Throwable cause = transformError( error );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandler.java
new file mode 100644
index 0000000000..81ece4335a
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async.inbound;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+
+import java.util.concurrent.TimeUnit;
+
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
+
+public class ConnectionReadTimeoutHandler extends ReadTimeoutHandler
+{
+ private boolean triggered;
+
+ public ConnectionReadTimeoutHandler( long timeout, TimeUnit unit )
+ {
+ super( timeout, unit );
+ }
+
+ @Override
+ protected void readTimedOut( ChannelHandlerContext ctx )
+ {
+ if ( !triggered )
+ {
+ ctx.fireExceptionCaught( ConnectionReadTimeoutException.INSTANCE );
+ ctx.close();
+ triggered = true;
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java
index 881c445291..c1f08de21c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java
@@ -50,6 +50,7 @@ public class InboundMessageDispatcher implements ResponseMessageHandler
private volatile boolean gracefullyClosed;
private Throwable currentError;
private boolean fatalErrorOccurred;
+ private HandlerHook beforeLastHandlerHook;
private ResponseHandler autoReadManagingHandler;
@@ -72,6 +73,15 @@ public void enqueue( ResponseHandler handler )
}
}
+ public void setBeforeLastHandlerHook( HandlerHook beforeLastHandlerHook )
+ {
+ if ( !channel.eventLoop().inEventLoop() )
+ {
+ throw new IllegalStateException( "This method may only be called in the EventLoop" );
+ }
+ this.beforeLastHandlerHook = beforeLastHandlerHook;
+ }
+
public int queuedHandlersCount()
{
return handlers.size();
@@ -81,6 +91,7 @@ public int queuedHandlersCount()
public void handleSuccessMessage( Map meta )
{
log.debug( "S: SUCCESS %s", meta );
+ invokeBeforeLastHandlerHook( HandlerHook.MessageType.SUCCESS );
ResponseHandler handler = removeHandler();
handler.onSuccess( meta );
}
@@ -127,6 +138,7 @@ public void handleFailureMessage( String code, String message )
channel.writeAndFlush( RESET, channel.voidPromise() );
}
+ invokeBeforeLastHandlerHook( HandlerHook.MessageType.FAILURE );
ResponseHandler handler = removeHandler();
handler.onFailure( currentError );
}
@@ -250,4 +262,23 @@ private void updateAutoReadManagingHandler( ResponseHandler newHandler )
}
autoReadManagingHandler = newHandler;
}
+
+ private void invokeBeforeLastHandlerHook( HandlerHook.MessageType messageType )
+ {
+ if ( handlers.size() == 1 && beforeLastHandlerHook != null )
+ {
+ beforeLastHandlerHook.run( messageType );
+ }
+ }
+
+ public interface HandlerHook
+ {
+ enum MessageType
+ {
+ SUCCESS,
+ FAILURE
+ }
+
+ void run( MessageType messageType );
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
index d8def041cb..479d5bdb08 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
@@ -71,7 +71,7 @@ public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, Pool
{
this( connector, bootstrap, new NettyChannelTracker( metricsListener, bootstrap.config().group().next(), logging ),
new NettyChannelHealthChecker( settings, clock, logging ), settings, metricsListener, logging,
- clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener ) );
+ clock, ownsEventLoopGroup, new NetworkConnectionFactory( clock, metricsListener, logging ) );
}
protected ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java
index db05a3bf97..1a3404e950 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NetworkConnectionFactory.java
@@ -20,6 +20,7 @@
import io.netty.channel.Channel;
+import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.async.NetworkConnection;
import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.spi.Connection;
@@ -29,16 +30,18 @@ public class NetworkConnectionFactory implements ConnectionFactory
{
private final Clock clock;
private final MetricsListener metricsListener;
+ private final Logging logging;
- public NetworkConnectionFactory( Clock clock, MetricsListener metricsListener )
+ public NetworkConnectionFactory( Clock clock, MetricsListener metricsListener, Logging logging )
{
this.clock = clock;
this.metricsListener = metricsListener;
+ this.logging = logging;
}
@Override
public Connection createConnection( Channel channel, ExtendedChannelPool pool )
{
- return new NetworkConnection( channel, pool, clock, metricsListener );
+ return new NetworkConnection( channel, pool, clock, metricsListener, logging );
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java
index abe833639f..f023f0e1c0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cluster;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
@@ -52,4 +53,12 @@ public interface RoutingTableRegistry
* Removes all routing tables that has been not used for a long time.
*/
void removeAged();
+
+ /**
+ * Returns routing table handler for the given database name if it exists in the registry.
+ *
+ * @param databaseName the database name
+ * @return the routing table handler for the requested database name
+ */
+ Optional getRoutingTableHandler( DatabaseName databaseName );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java
index 9c3ba3bceb..018dcb6586 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java
@@ -19,6 +19,7 @@
package org.neo4j.driver.internal.cluster;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
@@ -79,14 +80,23 @@ public void remove( DatabaseName databaseName )
@Override
public void removeAged()
{
- routingTableHandlers.forEach( ( databaseName, handler ) -> {
- if ( handler.isRoutingTableAged() )
- {
- logger.info( "Routing table handler for database '%s' is removed because it has not been used for a long time. Routing table: %s",
- databaseName.description(), handler.routingTable() );
- routingTableHandlers.remove( databaseName );
- }
- } );
+ routingTableHandlers.forEach(
+ ( databaseName, handler ) ->
+ {
+ if ( handler.isRoutingTableAged() )
+ {
+ logger.info(
+ "Routing table handler for database '%s' is removed because it has not been used for a long time. Routing table: %s",
+ databaseName.description(), handler.routingTable() );
+ routingTableHandlers.remove( databaseName );
+ }
+ } );
+ }
+
+ @Override
+ public Optional getRoutingTableHandler( DatabaseName databaseName )
+ {
+ return Optional.ofNullable( routingTableHandlers.get( databaseName ) );
}
// For tests
@@ -97,11 +107,13 @@ public boolean contains( DatabaseName databaseName )
private RoutingTableHandler getOrCreate( DatabaseName databaseName )
{
- return routingTableHandlers.computeIfAbsent( databaseName, name -> {
- RoutingTableHandler handler = factory.newInstance( name, this );
- logger.debug( "Routing table handler for database '%s' is added.", databaseName.description() );
- return handler;
- } );
+ return routingTableHandlers.computeIfAbsent(
+ databaseName, name ->
+ {
+ RoutingTableHandler handler = factory.newInstance( name, this );
+ logger.debug( "Routing table handler for database '%s' is added.", databaseName.description() );
+ return handler;
+ } );
}
static class RoutingTableHandlerFactory
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
index 7174fdf166..16a741efb8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java
@@ -175,12 +175,19 @@ public CompletionStage supportsMultiDb()
} );
}
+ public RoutingTableRegistry getRoutingTableRegistry()
+ {
+ return routingTables;
+ }
+
private CompletionStage supportsMultiDb( BoltServerAddress address )
{
- return connectionPool.acquire( address ).thenCompose( conn -> {
- boolean supportsMultiDatabase = supportsMultiDatabase( conn );
- return conn.release().thenApply( ignored -> supportsMultiDatabase );
- } );
+ return connectionPool.acquire( address ).thenCompose(
+ conn ->
+ {
+ boolean supportsMultiDatabase = supportsMultiDatabase( conn );
+ return conn.release().thenApply( ignored -> supportsMultiDatabase );
+ } );
}
private CompletionStage acquire( AccessMode mode, RoutingTable routingTable )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java
index 95c97849f7..af028cef16 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/HelloResponseHandler.java
@@ -22,6 +22,8 @@
import io.netty.channel.ChannelPromise;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
@@ -29,6 +31,7 @@
import org.neo4j.driver.internal.spi.ResponseHandler;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerAgent;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setServerVersion;
import static org.neo4j.driver.internal.util.MetadataExtractor.extractNeo4jServerVersion;
@@ -38,6 +41,8 @@
public class HelloResponseHandler implements ResponseHandler
{
private static final String CONNECTION_ID_METADATA_KEY = "connection_id";
+ public static final String CONFIGURATION_HINTS_KEY = "hints";
+ public static final String CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY = "connection.recv_timeout_seconds";
private final ChannelPromise connectionInitializedPromise;
private final Channel channel;
@@ -72,6 +77,8 @@ public void onSuccess( Map metadata )
String connectionId = extractConnectionId( metadata );
setConnectionId( channel, connectionId );
+ processConfigurationHints( metadata );
+
connectionInitializedPromise.setSuccess();
}
catch ( Throwable error )
@@ -103,4 +110,26 @@ private static String extractConnectionId( Map metadata )
}
return value.asString();
}
+
+ private void processConfigurationHints( Map metadata )
+ {
+ Value configurationHints = metadata.get( CONFIGURATION_HINTS_KEY );
+ if ( configurationHints != null )
+ {
+ getFromSupplierOrEmptyOnException( () -> configurationHints.get( CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY ).asLong() )
+ .ifPresent( timeout -> setConnectionReadTimeout( channel, timeout ) );
+ }
+ }
+
+ private static Optional getFromSupplierOrEmptyOnException( Supplier supplier )
+ {
+ try
+ {
+ return Optional.of( supplier.get() );
+ }
+ catch ( Exception e )
+ {
+ return Optional.empty();
+ }
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java
index 8a3d7afd59..02d2f6a898 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java
@@ -21,16 +21,16 @@
import java.util.Map;
import java.util.Objects;
-import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.internal.RoutingErrorHandler;
-import org.neo4j.driver.internal.spi.ResponseHandler;
-import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.RoutingErrorHandler;
+import org.neo4j.driver.internal.spi.ResponseHandler;
+import org.neo4j.driver.internal.util.Futures;
import static java.lang.String.format;
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java
index edc61123d0..31a6a69e5c 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java
@@ -23,6 +23,7 @@
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
@@ -70,6 +71,10 @@ else if ( error instanceof AuthorizationExpiredException )
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
+ else if ( error instanceof ConnectionReadTimeoutException )
+ {
+ connection.terminateAndRelease( error.getMessage() );
+ }
runFuture.completeExceptionally( error );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java
index 73b9473829..5ff2d63dc0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java
@@ -22,6 +22,7 @@
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.MetadataExtractor;
@@ -53,6 +54,10 @@ public void afterFailure( Throwable error )
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
+ else if ( error instanceof ConnectionReadTimeoutException )
+ {
+ connection.terminateAndRelease( error.getMessage() );
+ }
else
{
releaseConnection();
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java
index d118fb6dcf..ff50cf5234 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkConnectionTest.java
@@ -641,7 +641,7 @@ private static NetworkConnection newConnection( Channel channel )
private static NetworkConnection newConnection( Channel channel, ExtendedChannelPool pool )
{
- return new NetworkConnection( channel, pool, new FakeClock(), DEV_NULL_METRICS );
+ return new NetworkConnection( channel, pool, new FakeClock(), DEV_NULL_METRICS, DEV_NULL_LOGGING );
}
private static void assertConnectionReleasedError( IllegalStateException e )
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java
index ef1846adf6..b8909e176f 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java
@@ -26,7 +26,9 @@
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.DefaultBookmarkHolder;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.InternalBookmark;
@@ -38,6 +40,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -276,12 +279,44 @@ void shouldReleaseConnectionWhenClose() throws Throwable
verify( connection ).release();
}
- private static UnmanagedTransaction beginTx(Connection connection )
+ @Test
+ void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure()
+ {
+ AuthorizationExpiredException exception = new AuthorizationExpiredException( "code", "message" );
+ Connection connection = connectionWithBegin( handler -> handler.onFailure( exception ) );
+ UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE );
+ Bookmark bookmark = InternalBookmark.parse( "SomeBookmark" );
+ TransactionConfig txConfig = TransactionConfig.empty();
+
+ AuthorizationExpiredException actualException = assertThrows( AuthorizationExpiredException.class, () -> await( tx.beginAsync( bookmark, txConfig ) ) );
+
+ assertSame( exception, actualException );
+ verify( connection ).terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
+ verify( connection, never() ).release();
+ }
+
+ @Test
+ void shouldReleaseConnectionOnConnectionReadTimeoutExceptionFailure()
+ {
+ Connection connection = connectionWithBegin( handler -> handler.onFailure( ConnectionReadTimeoutException.INSTANCE ) );
+ UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE );
+ Bookmark bookmark = InternalBookmark.parse( "SomeBookmark" );
+ TransactionConfig txConfig = TransactionConfig.empty();
+
+ ConnectionReadTimeoutException actualException =
+ assertThrows( ConnectionReadTimeoutException.class, () -> await( tx.beginAsync( bookmark, txConfig ) ) );
+
+ assertSame( ConnectionReadTimeoutException.INSTANCE, actualException );
+ verify( connection ).terminateAndRelease( ConnectionReadTimeoutException.INSTANCE.getMessage() );
+ verify( connection, never() ).release();
+ }
+
+ private static UnmanagedTransaction beginTx( Connection connection )
{
return beginTx( connection, InternalBookmark.empty() );
}
- private static UnmanagedTransaction beginTx(Connection connection, Bookmark initialBookmark )
+ private static UnmanagedTransaction beginTx( Connection connection, Bookmark initialBookmark )
{
UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE );
return await( tx.beginAsync( initialBookmark, TransactionConfig.empty() ) );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java
index d3d294ead2..fef5808d8a 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelAttributesTest.java
@@ -32,6 +32,7 @@
import static org.mockito.Mockito.mock;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.authorizationStateListener;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionId;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionReadTimeout;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.creationTimestamp;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.lastUsedTimestamp;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
@@ -41,6 +42,7 @@
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverVersion;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionId;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setConnectionReadTimeout;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setCreationTimestamp;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher;
@@ -218,4 +220,20 @@ void shouldAllowOverridingAuthorizationStateListener()
setAuthorizationStateListener( channel, newListener );
assertEquals( newListener, authorizationStateListener( channel ) );
}
+
+ @Test
+ void shouldSetAndGetConnectionReadTimeout()
+ {
+ long timeout = 15L;
+ setConnectionReadTimeout( channel, timeout );
+ assertEquals( timeout, connectionReadTimeout( channel ).orElse( null ) );
+ }
+
+ @Test
+ void shouldFailToSetConnectionReadTimeoutTwice()
+ {
+ long timeout = 15L;
+ setConnectionReadTimeout( channel, timeout );
+ assertThrows( IllegalStateException.class, () -> setConnectionReadTimeout( channel, timeout ) );
+ }
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandlerTest.java
new file mode 100644
index 0000000000..e0aaec2488
--- /dev/null
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/ConnectionReadTimeoutHandlerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async.inbound;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
+
+import static org.mockito.BDDMockito.any;
+import static org.mockito.BDDMockito.then;
+import static org.mockito.BDDMockito.times;
+import static org.mockito.Mockito.mock;
+
+public class ConnectionReadTimeoutHandlerTest
+{
+ ConnectionReadTimeoutHandler handler = new ConnectionReadTimeoutHandler( 15L, TimeUnit.SECONDS );
+ ChannelHandlerContext context = mock( ChannelHandlerContext.class );
+
+ @Test
+ void shouldFireConnectionReadTimeoutExceptionAndCloseChannelOnReadTimeOutOnce()
+ {
+ // WHEN
+ IntStream.range( 0, 10 ).forEach( i -> handler.readTimedOut( context ) );
+
+ // THEN
+ then( context ).should( times( 1 ) ).fireExceptionCaught( any( ConnectionReadTimeoutException.class ) );
+ then( context ).should( times( 1 ) ).close();
+ }
+}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java
index 85cd88be53..b725731639 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
@@ -184,6 +185,12 @@ public void remove( DatabaseName databaseName )
public void removeAged()
{
}
+
+ @Override
+ public Optional getRoutingTableHandler( DatabaseName databaseName )
+ {
+ return Optional.empty();
+ }
};
RoutingTableHandler handler = newRoutingTableHandler( routingTable, rediscovery, connectionPool, registry );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java
index 2fd3600b6e..bfe0f4c502 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/HelloResponseHandlerTest.java
@@ -48,6 +48,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.neo4j.driver.Values.value;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionId;
+import static org.neo4j.driver.internal.async.connection.ChannelAttributes.connectionReadTimeout;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAgent;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverVersion;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setMessageDispatcher;
@@ -224,7 +225,68 @@ void shouldCloseChannelOnFailure() throws Exception
assertEquals( error, channelPromise.cause() );
}
+ @Test
+ void shouldNotThrowWhenConfigurationHintsAreAbsent()
+ {
+ ChannelPromise channelPromise = channel.newPromise();
+ HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION );
+
+ Map metadata = metadata( anyServerVersion(), "bolt-x" );
+ handler.onSuccess( metadata );
+
+ assertTrue( channelPromise.isSuccess() );
+ assertFalse( channel.closeFuture().isDone() );
+ }
+
+ @Test
+ void shouldNotThrowWhenConfigurationHintsAreEmpty()
+ {
+ ChannelPromise channelPromise = channel.newPromise();
+ HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION );
+
+ Map metadata = metadata( anyServerVersion(), "bolt-x", value( new HashMap<>() ) );
+ handler.onSuccess( metadata );
+
+ assertTrue( channelPromise.isSuccess() );
+ assertFalse( channel.closeFuture().isDone() );
+ }
+
+ @Test
+ void shouldNotThrowWhenConfigurationHintsAreNull()
+ {
+ ChannelPromise channelPromise = channel.newPromise();
+ HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION );
+
+ Map metadata = metadata( anyServerVersion(), "bolt-x", Values.NULL );
+ handler.onSuccess( metadata );
+
+ assertTrue( channelPromise.isSuccess() );
+ assertFalse( channel.closeFuture().isDone() );
+ }
+
+ @Test
+ void shouldSetConnectionTimeoutHint()
+ {
+ ChannelPromise channelPromise = channel.newPromise();
+ HelloResponseHandler handler = new HelloResponseHandler( channelPromise, BoltProtocolV41.VERSION );
+
+ long timeout = 15L;
+ Map hints = new HashMap<>();
+ hints.put( HelloResponseHandler.CONNECTION_RECEIVE_TIMEOUT_SECONDS_KEY, value( timeout ) );
+ Map metadata = metadata( anyServerVersion(), "bolt-x", value( hints ) );
+ handler.onSuccess( metadata );
+
+ assertEquals( timeout, connectionReadTimeout( channel ).orElse( null ) );
+ assertTrue( channelPromise.isSuccess() );
+ assertFalse( channel.closeFuture().isDone() );
+ }
+
private static Map metadata( Object version, Object connectionId )
+ {
+ return metadata( version, connectionId, null );
+ }
+
+ private static Map metadata( Object version, Object connectionId, Value hints )
{
Map result = new HashMap<>();
@@ -249,6 +311,7 @@ else if ( version instanceof Value && ((Value) version).isNull() )
{
result.put( "connection_id", value( connectionId ) );
}
+ result.put( HelloResponseHandler.CONFIGURATION_HINTS_KEY, hints );
return result;
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java
index 73cb31ce28..d417cc9db8 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/RunResponseHandlerTest.java
@@ -27,6 +27,7 @@
import java.util.concurrent.ExecutionException;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3;
import org.neo4j.driver.internal.spi.Connection;
@@ -189,6 +190,23 @@ void shouldReleaseConnectionImmediatelyAndFailOnAuthorizationExpiredExceptionFai
verify( connection, never() ).release();
}
+ @Test
+ void shouldReleaseConnectionImmediatelyAndFailOnConnectionReadTimeoutExceptionFailure()
+ {
+ CompletableFuture runFuture = new CompletableFuture<>();
+ Connection connection = mock( Connection.class );
+ RunResponseHandler handler = new RunResponseHandler( runFuture, BoltProtocolV3.METADATA_EXTRACTOR, connection, null );
+
+ assertFalse( runFuture.isDone() );
+ handler.onFailure( ConnectionReadTimeoutException.INSTANCE );
+
+ assertTrue( runFuture.isCompletedExceptionally() );
+ ConnectionReadTimeoutException actualException = assertThrows( ConnectionReadTimeoutException.class, () -> await( runFuture ) );
+ assertSame( ConnectionReadTimeoutException.INSTANCE, actualException );
+ verify( connection ).terminateAndRelease( ConnectionReadTimeoutException.INSTANCE.getMessage() );
+ verify( connection, never() ).release();
+ }
+
private static void testResultAvailableAfterOnSuccess( String key, MetadataExtractor metadataExtractor )
{
RunResponseHandler handler = newHandler( metadataExtractor );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java
index cb0cfa7579..4b2cc67f4c 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java
@@ -23,6 +23,8 @@
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.Query;
+import org.neo4j.driver.exceptions.AuthorizationExpiredException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.InternalBookmark;
@@ -35,6 +37,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.Values.value;
@@ -80,13 +83,44 @@ void shouldUpdateBookmarksOnSuccess()
verify( bookmarkHolder ).setBookmark( InternalBookmark.parse( bookmarkValue ) );
}
+ @Test
+ void shouldReleaseConnectionImmediatelyOnAuthorizationExpiredExceptionFailure()
+ {
+ Connection connection = newConnectionMock();
+ PullResponseCompletionListener listener = new SessionPullResponseCompletionListener( connection, BookmarkHolder.NO_OP );
+ ResponseHandler handler = newHandler( connection, listener );
+ AuthorizationExpiredException exception = new AuthorizationExpiredException( "code", "message" );
+
+ handler.onFailure( exception );
+
+ verify( connection ).terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
+ verify( connection, never() ).release();
+ }
+
+ @Test
+ void shouldReleaseConnectionImmediatelyOnConnectionReadTimeoutExceptionFailure()
+ {
+ Connection connection = newConnectionMock();
+ PullResponseCompletionListener listener = new SessionPullResponseCompletionListener( connection, BookmarkHolder.NO_OP );
+ ResponseHandler handler = newHandler( connection, listener );
+
+ handler.onFailure( ConnectionReadTimeoutException.INSTANCE );
+
+ verify( connection ).terminateAndRelease( ConnectionReadTimeoutException.INSTANCE.getMessage() );
+ verify( connection, never() ).release();
+ }
+
private static ResponseHandler newHandler( Connection connection, PullResponseCompletionListener listener )
{
RunResponseHandler runHandler = new RunResponseHandler( new CompletableFuture<>(), BoltProtocolV3.METADATA_EXTRACTOR, mock( Connection.class ), null );
BasicPullResponseHandler handler =
new BasicPullResponseHandler( new Query( "RETURN 1" ), runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, listener );
- handler.installRecordConsumer( ( record, throwable ) -> {} );
- handler.installSummaryConsumer( ( resultSummary, throwable ) -> {} );
+ handler.installRecordConsumer( ( record, throwable ) ->
+ {
+ } );
+ handler.installSummaryConsumer( ( resultSummary, throwable ) ->
+ {
+ } );
return handler;
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java
index 1e49198a4c..f55dc6c160 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java
@@ -45,6 +45,7 @@
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
+import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
@@ -810,6 +811,28 @@ void doesRetryOnAuthorizationExpiredException()
assertEquals( "Done", result );
}
+ @Test
+ void doesRetryOnConnectionReadTimeoutException()
+ {
+ Clock clock = mock( Clock.class );
+ Logging logging = mock( Logging.class );
+ Logger logger = mock( Logger.class );
+ when( logging.getLog( anyString() ) ).thenReturn( logger );
+ ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging );
+
+ AtomicBoolean exceptionThrown = new AtomicBoolean( false );
+ String result = logic.retry( () ->
+ {
+ if ( exceptionThrown.compareAndSet( false, true ) )
+ {
+ throw ConnectionReadTimeoutException.INSTANCE;
+ }
+ return "Done";
+ } );
+
+ assertEquals( "Done", result );
+ }
+
@Test
void doesNotRetryOnRandomClientException()
{
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java
index a13bbb7460..b42c72cfd9 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java
@@ -32,12 +32,14 @@
import org.neo4j.driver.Result;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.exceptions.Neo4jException;
+import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
import org.neo4j.driver.net.ServerAddress;
@Getter
public class TestkitState
{
private final Map drivers = new HashMap<>();
+ private final Map routingTableRegistry = new HashMap<>();
private final Map sessionStates = new HashMap<>();
private final Map results = new HashMap<>();
private final Map transactions = new HashMap<>();
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java
index 6a8330243c..aafea6e41c 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java
@@ -36,7 +36,8 @@ public class GetFeatures implements TestkitRequest
{
private static final Set FEATURES = new HashSet<>( Arrays.asList(
"AuthorizationExpiredTreatment",
- "Optimization:PullPipelining"
+ "Optimization:PullPipelining",
+ "ConfHint:connection.recv_timeout_seconds"
) );
@Override
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
new file mode 100644
index 0000000000..64457bf42f
--- /dev/null
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package neo4j.org.testkit.backend.messages.requests;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import neo4j.org.testkit.backend.TestkitState;
+import neo4j.org.testkit.backend.messages.responses.RoutingTable;
+import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.DatabaseName;
+import org.neo4j.driver.internal.DatabaseNameUtil;
+import org.neo4j.driver.internal.cluster.AddressSet;
+import org.neo4j.driver.internal.cluster.RoutingTableHandler;
+import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
+
+@Setter
+@Getter
+@NoArgsConstructor
+public class GetRoutingTable implements TestkitRequest
+{
+ private GetRoutingTableBody data;
+
+ @Override
+ public TestkitResponse process( TestkitState testkitState )
+ {
+ RoutingTableRegistry routingTableRegistry = testkitState.getRoutingTableRegistry().get( data.getDriverId() );
+ if ( routingTableRegistry == null )
+ {
+ throw new IllegalStateException(
+ String.format( "There is no routing table registry for '%s' driver. (It might be a direct driver)", data.getDriverId() ) );
+ }
+
+ DatabaseName databaseName = DatabaseNameUtil.database( data.getDatabase() );
+ RoutingTableHandler routingTableHandler = routingTableRegistry.getRoutingTableHandler( databaseName ).orElseThrow(
+ () -> new IllegalStateException(
+ String.format( "There is no routing table handler for the '%s' database.", databaseName.databaseName().orElse( "null" ) ) ) );
+
+ org.neo4j.driver.internal.cluster.RoutingTable routingTable = routingTableHandler.routingTable();
+ Function> addressesToStrings = ( addresses ) -> Arrays.stream( addresses.toArray() )
+ .map( BoltServerAddress::toString ).collect( Collectors.toList() );
+
+ return RoutingTable
+ .builder()
+ .data( RoutingTable.RoutingTableBody
+ .builder()
+ .database( databaseName.databaseName().orElse( null ) )
+ .routers( addressesToStrings.apply( routingTable.routers() ) )
+ .readers( addressesToStrings.apply( routingTable.readers() ) )
+ .writers( addressesToStrings.apply( routingTable.writers() ) )
+ .build()
+ ).build();
+ }
+
+ @Setter
+ @Getter
+ @NoArgsConstructor
+ public static class GetRoutingTableBody
+ {
+ private String driverId;
+ private String database;
+ }
+}
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java
index cdb22390a7..ec9f98a0bc 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java
@@ -42,6 +42,7 @@
import org.neo4j.driver.internal.DriverFactory;
import org.neo4j.driver.internal.SecuritySettings;
import org.neo4j.driver.internal.cluster.RoutingSettings;
+import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.net.ServerAddressResolver;
@@ -91,7 +92,7 @@ public TestkitResponse process( TestkitState testkitState )
org.neo4j.driver.Driver driver;
try
{
- driver = driver( URI.create( data.uri ), authToken, configBuilder.build(), domainNameResolver );
+ driver = driver( URI.create( data.uri ), authToken, configBuilder.build(), domainNameResolver, testkitState, id );
}
catch ( RuntimeException e )
{
@@ -141,14 +142,15 @@ private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState
};
}
- private org.neo4j.driver.Driver driver( URI uri, AuthToken authToken, Config config, DomainNameResolver domainNameResolver )
+ private org.neo4j.driver.Driver driver( URI uri, AuthToken authToken, Config config, DomainNameResolver domainNameResolver, TestkitState testkitState,
+ String driverId )
{
RoutingSettings routingSettings = RoutingSettings.DEFAULT;
RetrySettings retrySettings = RetrySettings.DEFAULT;
SecuritySettings.SecuritySettingsBuilder securitySettingsBuilder = new SecuritySettings.SecuritySettingsBuilder();
SecuritySettings securitySettings = securitySettingsBuilder.build();
SecurityPlan securityPlan = securitySettings.createSecurityPlan( uri.getScheme() );
- return new DriverFactoryWithDomainNameResolver( domainNameResolver )
+ return new DriverFactoryWithDomainNameResolver( domainNameResolver, testkitState, driverId )
.newInstance( uri, authToken, routingSettings, retrySettings, config, securityPlan );
}
@@ -183,11 +185,19 @@ public static class NewDriverBody
private static class DriverFactoryWithDomainNameResolver extends DriverFactory
{
private final DomainNameResolver domainNameResolver;
+ private final TestkitState testkitState;
+ private final String driverId;
@Override
protected DomainNameResolver getDomainNameResolver()
{
return domainNameResolver;
}
+
+ @Override
+ protected void handleNewLoadBalancer( LoadBalancer loadBalancer )
+ {
+ testkitState.getRoutingTableRegistry().put( driverId, loadBalancer.getRoutingTableRegistry() );
+ }
}
}
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java
index 3b63a2157c..0a1afdd964 100644
--- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java
@@ -35,7 +35,8 @@
@JsonSubTypes.Type( SessionLastBookmarks.class ), @JsonSubTypes.Type( SessionWriteTransaction.class ),
@JsonSubTypes.Type( ResolverResolutionCompleted.class ), @JsonSubTypes.Type( CheckMultiDBSupport.class ),
@JsonSubTypes.Type( DomainNameResolutionCompleted.class ), @JsonSubTypes.Type( StartTest.class ),
- @JsonSubTypes.Type( TransactionRollback.class ), @JsonSubTypes.Type( GetFeatures.class )
+ @JsonSubTypes.Type( TransactionRollback.class ), @JsonSubTypes.Type( GetFeatures.class ),
+ @JsonSubTypes.Type( GetRoutingTable.class )
} )
public interface TestkitRequest
{
diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/RoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/RoutingTable.java
new file mode 100644
index 0000000000..2a802ec1c3
--- /dev/null
+++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/RoutingTable.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package neo4j.org.testkit.backend.messages.responses;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+@Setter
+@Getter
+@Builder
+public class RoutingTable implements TestkitResponse
+{
+ private RoutingTableBody data;
+
+ @Override
+ public String testkitName()
+ {
+ return "RoutingTable";
+ }
+
+ @Setter
+ @Getter
+ @Builder
+ public static class RoutingTableBody
+ {
+ private String database;
+ private long ttl;
+ private List routers;
+ private List readers;
+ private List writers;
+ }
+}