Skip to content

Add support for connection.recv_timeout_seconds connection hint #968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ Make sure to run build for the whole project and not just for `testkit-tests` mo
- `mvn clean verify -DtestkitArgs='--tests STUB_TESTS'` - runs all project tests and Testkit stub tests.
- `mvn clean verify -DskipTests -P testkit-tests` - skips all project tests and runs Testkit tests.
- `mvn clean verify -DskipTests -DtestkitArgs='--tests STUB_TESTS'` - skips all project tests and runs Testkit stub tests.
- `mvn clean verify -DskipITs -DtestkitArgs='--tests STUB_TESTS'` - skips all integration tests and runs Testkit stub tests.

If you interrupt Maven build, you have to remove Testkit containers manually.

##### Running Testkit manually

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.exceptions;

/**
* Indicates that read timed out due to it taking longer than the server-supplied timeout value via the {@code connection.recv_timeout_seconds} configuration
* hint. The server might provide this value to clients to let them know when a given connection may be considered broken if client does not get any
* communication from the server within the specified timeout period. This results in the server being removed from the routing table.
*/
public class ConnectionReadTimeoutException extends ServiceUnavailableException
{
public static final ConnectionReadTimeoutException INSTANCE = new ConnectionReadTimeoutException(
"Connection read timed out due to it taking longer than the server-supplied timeout value via configuration hint." );

public ConnectionReadTimeoutException( String message )
{
super( message );
}
}
19 changes: 16 additions & 3 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,25 @@ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory
* <b>This method is protected only for testing</b>
*/
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
{
LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
ServerAddressResolver resolver = createResolver( config );
return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() );
LoadBalancer loadBalancer = new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() );
handleNewLoadBalancer( loadBalancer );
return loadBalancer;
}

/**
* Handles new {@link LoadBalancer} instance.
* <p>
* <b>This method is protected for Testkit backend usage only.</b>
*
* @param loadBalancer the new load balancer instance.
*/
protected void handleNewLoadBalancer( LoadBalancer loadBalancer )
{
}

private static ServerAddressResolver createResolver( Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
package org.neo4j.driver.internal.async;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.async.pool.ExtendedChannelPool;
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
Expand All @@ -45,13 +50,12 @@
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;

/**
* This connection represents a simple network connection to a remote server.
* It wraps a channel obtained from a connection pool.
* The life cycle of this connection start from the moment the channel is borrowed out of the pool
* and end at the time the connection is released back to the pool.
* This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this
* connection start from the moment the channel is borrowed out of the pool and end at the time the connection is released back to the pool.
*/
public class NetworkConnection implements Connection
{
private final Logger log;
private final Channel channel;
private final InboundMessageDispatcher messageDispatcher;
private final String serverAgent;
Expand All @@ -66,8 +70,12 @@ public class NetworkConnection implements Connection
private final MetricsListener metricsListener;
private final ListenerEvent inUseEvent;

public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener )
private final Long connectionReadTimeout;
private ChannelHandler connectionReadTimeoutHandler;

public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener, Logging logging )
{
this.log = logging.getLog( this.getClass().getCanonicalName() );
this.channel = channel;
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
this.serverAgent = ChannelAttributes.serverAgent( channel );
Expand All @@ -79,6 +87,7 @@ public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Cloc
this.clock = clock;
this.metricsListener = metricsListener;
this.inUseEvent = metricsListener.createListenerEvent();
this.connectionReadTimeout = ChannelAttributes.connectionReadTimeout( channel ).orElse( null );
metricsListener.afterConnectionCreated( poolId( this.channel ), this.inUseEvent );
}

Expand Down Expand Up @@ -225,14 +234,19 @@ private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean is
setAutoRead( true );

messageDispatcher.enqueue( resetHandler );
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
channel.writeAndFlush( ResetMessage.RESET ).addListener( future -> registerConnectionReadTimeout( channel ) );
}
} );
}

private void flushInEventLoop()
{
channel.eventLoop().execute( channel::flush );
channel.eventLoop().execute(
() ->
{
channel.flush();
registerConnectionReadTimeout( channel );
} );
}

private void writeMessageInEventLoop( Message message, ResponseHandler handler, boolean flush )
Expand All @@ -243,7 +257,7 @@ private void writeMessageInEventLoop( Message message, ResponseHandler handler,

if ( flush )
{
channel.writeAndFlush( message, channel.voidPromise() );
channel.writeAndFlush( message ).addListener( future -> registerConnectionReadTimeout( channel ) );
}
else
{
Expand All @@ -263,7 +277,7 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler

if ( flush )
{
channel.writeAndFlush( message2, channel.voidPromise() );
channel.writeAndFlush( message2 ).addListener( future -> registerConnectionReadTimeout( channel ) );
}
else
{
Expand Down Expand Up @@ -311,6 +325,29 @@ private boolean verifyOpen( ResponseHandler handler1, ResponseHandler handler2 )
}
}

private void registerConnectionReadTimeout( Channel channel )
{
if ( !channel.eventLoop().inEventLoop() )
{
throw new IllegalStateException( "This method may only be called in the EventLoop" );
}

if ( connectionReadTimeout != null && connectionReadTimeoutHandler == null )
{
connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler( connectionReadTimeout, TimeUnit.SECONDS );
channel.pipeline().addFirst( connectionReadTimeoutHandler );
log.debug( "Added ConnectionReadTimeoutHandler" );
messageDispatcher.setBeforeLastHandlerHook(
( messageType ) ->
{
channel.pipeline().remove( connectionReadTimeoutHandler );
connectionReadTimeoutHandler = null;
messageDispatcher.setBeforeLastHandlerHook( null );
log.debug( "Removed ConnectionReadTimeoutHandler" );
} );
}
}

private enum Status
{
OPEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursor;
Expand Down Expand Up @@ -147,6 +148,10 @@ public CompletionStage<UnmanagedTransaction> beginAsync( Bookmark initialBookmar
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
else if ( beginError instanceof ConnectionReadTimeoutException )
{
connection.terminateAndRelease( beginError.getMessage() );
}
else
{
connection.release();
Expand Down Expand Up @@ -325,6 +330,10 @@ private void handleTransactionCompletion( boolean commitOnSuccess, Throwable thr
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
else if ( throwable instanceof ConnectionReadTimeoutException )
{
connection.terminateAndRelease( throwable.getMessage() );
}
else
{
connection.release(); // release in background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;

import java.util.Optional;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
Expand All @@ -42,6 +44,9 @@ public final class ChannelAttributes
private static final AttributeKey<String> TERMINATION_REASON = newInstance( "terminationReason" );
private static final AttributeKey<AuthorizationStateListener> AUTHORIZATION_STATE_LISTENER = newInstance( "authorizationStateListener" );

// configuration hints provided by the server
private static final AttributeKey<Long> CONNECTION_READ_TIMEOUT = newInstance( "connectionReadTimeout" );

private ChannelAttributes()
{
}
Expand Down Expand Up @@ -156,6 +161,16 @@ public static void setAuthorizationStateListener( Channel channel, Authorization
set( channel, AUTHORIZATION_STATE_LISTENER, authorizationStateListener );
}

public static Optional<Long> connectionReadTimeout( Channel channel )
{
return Optional.ofNullable( get( channel, CONNECTION_READ_TIMEOUT ) );
}

public static void setConnectionReadTimeout( Channel channel, Long connectionReadTimeout )
{
setOnce( channel, CONNECTION_READ_TIMEOUT, connectionReadTimeout );
}

private static <T> T get( Channel channel, AttributeKey<T> key )
{
return channel.attr( key ).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@

import java.io.IOException;

import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.util.ErrorUtil;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
Expand Down Expand Up @@ -94,11 +95,19 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
else
{
failed = true;
log.warn( "Fatal error occurred in the pipeline", error );
logUnexpectedErrorWarning( error );
fail( error );
}
}

private void logUnexpectedErrorWarning( Throwable error )
{
if ( !(error instanceof ConnectionReadTimeoutException) )
{
log.warn( "Fatal error occurred in the pipeline", error );
}
}

private void fail( Throwable error )
{
Throwable cause = transformError( error );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async.inbound;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;

import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;

public class ConnectionReadTimeoutHandler extends ReadTimeoutHandler
{
private boolean triggered;

public ConnectionReadTimeoutHandler( long timeout, TimeUnit unit )
{
super( timeout, unit );
}

@Override
protected void readTimedOut( ChannelHandlerContext ctx )
{
if ( !triggered )
{
ctx.fireExceptionCaught( ConnectionReadTimeoutException.INSTANCE );
ctx.close();
triggered = true;
}
}
}
Loading