Skip to content

Commit e1d47cc

Browse files
committed
Add support for connection.recv_timeout_seconds connection hint
1 parent 13e1829 commit e1d47cc

33 files changed

+716
-49
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ Make sure to run build for the whole project and not just for `testkit-tests` mo
128128
- `mvn clean verify -DtestkitArgs='--tests STUB_TESTS'` - runs all project tests and Testkit stub tests.
129129
- `mvn clean verify -DskipTests -P testkit-tests` - skips all project tests and runs Testkit tests.
130130
- `mvn clean verify -DskipTests -DtestkitArgs='--tests STUB_TESTS'` - skips all project tests and runs Testkit stub tests.
131+
- `mvn clean verify -DskipITs -DtestkitArgs='--tests STUB_TESTS'` - skips all integration tests and runs Testkit stub tests.
132+
133+
If you interrupt Maven build, you have to remove Testkit containers manually.
131134

132135
##### Running Testkit manually
133136

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
/**
22+
* Indicates that read timed out due to it taking longer than the server-supplied timeout value via the {@code connection.recv_timeout_seconds} configuration
23+
* hint. The server might provide this value to clients to let them know when a given connection may be considered broken if client does not get any
24+
* communication from the server within the specified timeout period. This results in the server being removed from the routing table.
25+
*/
26+
public class ConnectionReadTimeoutException extends ServiceUnavailableException
27+
{
28+
public static final ConnectionReadTimeoutException INSTANCE = new ConnectionReadTimeoutException(
29+
"Connection read timed out due to it taking longer than the server-supplied timeout value via configuration hint." );
30+
31+
public ConnectionReadTimeoutException( String message )
32+
{
33+
super( message );
34+
}
35+
}

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,25 @@ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory
207207
* <b>This method is protected only for testing</b>
208208
*/
209209
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
210-
EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
210+
EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
211211
{
212212
LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
213213
ServerAddressResolver resolver = createResolver( config );
214-
return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
215-
config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() );
214+
LoadBalancer loadBalancer = new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
215+
config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() );
216+
handleNewLoadBalancer( loadBalancer );
217+
return loadBalancer;
218+
}
219+
220+
/**
221+
* Handles new {@link LoadBalancer} instance.
222+
* <p>
223+
* <b>This method is protected for Testkit backend usage only.</b>
224+
*
225+
* @param loadBalancer the new load balancer instance.
226+
*/
227+
protected void handleNewLoadBalancer( LoadBalancer loadBalancer )
228+
{
216229
}
217230

218231
private static ServerAddressResolver createResolver( Config config )

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelHandler;
2223

2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.TimeUnit;
2527
import java.util.concurrent.atomic.AtomicReference;
2628

29+
import org.neo4j.driver.Logger;
30+
import org.neo4j.driver.Logging;
2731
import org.neo4j.driver.internal.BoltServerAddress;
2832
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
33+
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
2934
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
3035
import org.neo4j.driver.internal.async.pool.ExtendedChannelPool;
3136
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
@@ -45,13 +50,12 @@
4550
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
4651

4752
/**
48-
* This connection represents a simple network connection to a remote server.
49-
* It wraps a channel obtained from a connection pool.
50-
* The life cycle of this connection start from the moment the channel is borrowed out of the pool
51-
* and end at the time the connection is released back to the pool.
53+
* This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this
54+
* connection start from the moment the channel is borrowed out of the pool and end at the time the connection is released back to the pool.
5255
*/
5356
public class NetworkConnection implements Connection
5457
{
58+
private final Logger log;
5559
private final Channel channel;
5660
private final InboundMessageDispatcher messageDispatcher;
5761
private final String serverAgent;
@@ -66,8 +70,12 @@ public class NetworkConnection implements Connection
6670
private final MetricsListener metricsListener;
6771
private final ListenerEvent inUseEvent;
6872

69-
public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener )
73+
private final Long connectionReadTimeout;
74+
private ChannelHandler connectionReadTimeoutHandler;
75+
76+
public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener, Logging logging )
7077
{
78+
this.log = logging.getLog( this.getClass().getCanonicalName() );
7179
this.channel = channel;
7280
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
7381
this.serverAgent = ChannelAttributes.serverAgent( channel );
@@ -79,6 +87,7 @@ public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Cloc
7987
this.clock = clock;
8088
this.metricsListener = metricsListener;
8189
this.inUseEvent = metricsListener.createListenerEvent();
90+
this.connectionReadTimeout = ChannelAttributes.connectionReadTimeout( channel ).orElse( null );
8291
metricsListener.afterConnectionCreated( poolId( this.channel ), this.inUseEvent );
8392
}
8493

@@ -225,14 +234,19 @@ private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean is
225234
setAutoRead( true );
226235

227236
messageDispatcher.enqueue( resetHandler );
228-
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
237+
channel.writeAndFlush( ResetMessage.RESET ).addListener( future -> registerConnectionReadTimeout( channel ) );
229238
}
230239
} );
231240
}
232241

233242
private void flushInEventLoop()
234243
{
235-
channel.eventLoop().execute( channel::flush );
244+
channel.eventLoop().execute(
245+
() ->
246+
{
247+
channel.flush();
248+
registerConnectionReadTimeout( channel );
249+
} );
236250
}
237251

238252
private void writeMessageInEventLoop( Message message, ResponseHandler handler, boolean flush )
@@ -243,7 +257,7 @@ private void writeMessageInEventLoop( Message message, ResponseHandler handler,
243257

244258
if ( flush )
245259
{
246-
channel.writeAndFlush( message, channel.voidPromise() );
260+
channel.writeAndFlush( message ).addListener( future -> registerConnectionReadTimeout( channel ) );
247261
}
248262
else
249263
{
@@ -263,7 +277,7 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
263277

264278
if ( flush )
265279
{
266-
channel.writeAndFlush( message2, channel.voidPromise() );
280+
channel.writeAndFlush( message2 ).addListener( future -> registerConnectionReadTimeout( channel ) );
267281
}
268282
else
269283
{
@@ -311,6 +325,29 @@ private boolean verifyOpen( ResponseHandler handler1, ResponseHandler handler2 )
311325
}
312326
}
313327

328+
private void registerConnectionReadTimeout( Channel channel )
329+
{
330+
if ( !channel.eventLoop().inEventLoop() )
331+
{
332+
throw new IllegalStateException( "This method may only be called in the EventLoop" );
333+
}
334+
335+
if ( connectionReadTimeout != null && connectionReadTimeoutHandler == null )
336+
{
337+
connectionReadTimeoutHandler = new ConnectionReadTimeoutHandler( connectionReadTimeout, TimeUnit.SECONDS );
338+
channel.pipeline().addFirst( connectionReadTimeoutHandler );
339+
log.debug( "Added ConnectionReadTimeoutHandler" );
340+
messageDispatcher.setBeforeLastHandlerHook(
341+
( messageType ) ->
342+
{
343+
channel.pipeline().remove( connectionReadTimeoutHandler );
344+
connectionReadTimeoutHandler = null;
345+
messageDispatcher.setBeforeLastHandlerHook( null );
346+
log.debug( "Removed ConnectionReadTimeoutHandler" );
347+
} );
348+
}
349+
}
350+
314351
private enum Status
315352
{
316353
OPEN,

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.async.ResultCursor;
3232
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
3333
import org.neo4j.driver.exceptions.ClientException;
34+
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
3435
import org.neo4j.driver.internal.BookmarkHolder;
3536
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
3637
import org.neo4j.driver.internal.cursor.RxResultCursor;
@@ -147,6 +148,10 @@ public CompletionStage<UnmanagedTransaction> beginAsync( Bookmark initialBookmar
147148
{
148149
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
149150
}
151+
else if ( beginError instanceof ConnectionReadTimeoutException )
152+
{
153+
connection.terminateAndRelease( beginError.getMessage() );
154+
}
150155
else
151156
{
152157
connection.release();
@@ -325,6 +330,10 @@ private void handleTransactionCompletion( boolean commitOnSuccess, Throwable thr
325330
{
326331
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
327332
}
333+
else if ( throwable instanceof ConnectionReadTimeoutException )
334+
{
335+
connection.terminateAndRelease( throwable.getMessage() );
336+
}
328337
else
329338
{
330339
connection.release(); // release in background

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelAttributes.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.netty.channel.Channel;
2222
import io.netty.util.AttributeKey;
2323

24+
import java.util.Optional;
25+
2426
import org.neo4j.driver.internal.BoltServerAddress;
2527
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2628
import org.neo4j.driver.internal.messaging.BoltProtocolVersion;
@@ -42,6 +44,9 @@ public final class ChannelAttributes
4244
private static final AttributeKey<String> TERMINATION_REASON = newInstance( "terminationReason" );
4345
private static final AttributeKey<AuthorizationStateListener> AUTHORIZATION_STATE_LISTENER = newInstance( "authorizationStateListener" );
4446

47+
// configuration hints provided by the server
48+
private static final AttributeKey<Long> CONNECTION_READ_TIMEOUT = newInstance( "connectionReadTimeout" );
49+
4550
private ChannelAttributes()
4651
{
4752
}
@@ -156,6 +161,16 @@ public static void setAuthorizationStateListener( Channel channel, Authorization
156161
set( channel, AUTHORIZATION_STATE_LISTENER, authorizationStateListener );
157162
}
158163

164+
public static Optional<Long> connectionReadTimeout( Channel channel )
165+
{
166+
return Optional.ofNullable( get( channel, CONNECTION_READ_TIMEOUT ) );
167+
}
168+
169+
public static void setConnectionReadTimeout( Channel channel, Long connectionReadTimeout )
170+
{
171+
setOnce( channel, CONNECTION_READ_TIMEOUT, connectionReadTimeout );
172+
}
173+
159174
private static <T> T get( Channel channel, AttributeKey<T> key )
160175
{
161176
return channel.attr( key ).get();

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424

2525
import java.io.IOException;
2626

27-
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
28-
import org.neo4j.driver.internal.util.ErrorUtil;
2927
import org.neo4j.driver.Logger;
3028
import org.neo4j.driver.Logging;
29+
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
3130
import org.neo4j.driver.exceptions.ServiceUnavailableException;
31+
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
32+
import org.neo4j.driver.internal.util.ErrorUtil;
3233

3334
import static java.util.Objects.requireNonNull;
3435
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.messageDispatcher;
@@ -94,11 +95,19 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
9495
else
9596
{
9697
failed = true;
97-
log.warn( "Fatal error occurred in the pipeline", error );
98+
logUnexpectedErrorWarning( error );
9899
fail( error );
99100
}
100101
}
101102

103+
private void logUnexpectedErrorWarning( Throwable error )
104+
{
105+
if ( !(error instanceof ConnectionReadTimeoutException) )
106+
{
107+
log.warn( "Fatal error occurred in the pipeline", error );
108+
}
109+
}
110+
102111
private void fail( Throwable error )
103112
{
104113
Throwable cause = transformError( error );
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.inbound;
20+
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.timeout.ReadTimeoutHandler;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
27+
28+
public class ConnectionReadTimeoutHandler extends ReadTimeoutHandler
29+
{
30+
private boolean triggered;
31+
32+
public ConnectionReadTimeoutHandler( long timeout, TimeUnit unit )
33+
{
34+
super( timeout, unit );
35+
}
36+
37+
@Override
38+
protected void readTimedOut( ChannelHandlerContext ctx )
39+
{
40+
if ( !triggered )
41+
{
42+
ctx.fireExceptionCaught( ConnectionReadTimeoutException.INSTANCE );
43+
ctx.close();
44+
triggered = true;
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)