Skip to content

Commit a56490a

Browse files
committed
Use connect timeout in Bolt and TLS handshake
Previously configured connection timeout has only been used to limit amount of time it takes to establish a TCP connection. This is not the only thing driver does to establish a logical connection. It also executes TLS handshake (if configured to use encryption) and Bolt handshake to negotiate protocol version with the database. Later two steps perform reads without any timeout. This could be a problem when database does not respond in time. Also driver might be simply connecting to something that is not the database and never responds. This commit makes driver use configured value of connect timeout as read timeout for TLS and Bolt handshakes. So both will not hang forever when other side does not respond. Default value of connection timeout is 5 seconds. With this commit driver will wait up to 5 seconds for TLS and Bolt handshakes. Timeout is enforced by `ConnectTimeoutHandler` which is added to the channel pipeline when connection is established and removed from it when Bolt handshake completes.
1 parent cfbff4a commit a56490a

File tree

8 files changed

+395
-25
lines changed

8 files changed

+395
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelOption;
25+
import io.netty.channel.ChannelPipeline;
2526
import io.netty.channel.ChannelPromise;
2627

2728
import java.util.Map;
2829

2930
import org.neo4j.driver.internal.BoltServerAddress;
3031
import org.neo4j.driver.internal.ConnectionSettings;
32+
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
3133
import org.neo4j.driver.internal.security.InternalAuthToken;
3234
import org.neo4j.driver.internal.security.SecurityPlan;
3335
import org.neo4j.driver.internal.util.Clock;
@@ -71,20 +73,47 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan
7173
public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
7274
{
7375
bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis );
74-
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock, logging ) );
76+
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) );
7577

7678
ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
7779

7880
Channel channel = channelConnected.channel();
7981
ChannelPromise handshakeCompleted = channel.newPromise();
8082
ChannelPromise connectionInitialized = channel.newPromise();
8183

84+
installChannelConnectedListeners( address, channelConnected, handshakeCompleted );
85+
installHandshakeCompletedListeners( handshakeCompleted, connectionInitialized );
86+
87+
return connectionInitialized;
88+
}
89+
90+
private void installChannelConnectedListeners( BoltServerAddress address, ChannelFuture channelConnected,
91+
ChannelPromise handshakeCompleted )
92+
{
93+
ChannelPipeline pipeline = channelConnected.channel().pipeline();
94+
95+
// add timeout handler to the pipeline when channel is connected. it's needed to limit amount of time code
96+
// spends in TLS and Bolt handshakes. prevents infinite waiting when database does not respond
97+
channelConnected.addListener( future ->
98+
pipeline.addFirst( new ConnectTimeoutHandler( connectTimeoutMillis ) ) );
99+
100+
// add listener that sends Bolt handshake bytes when channel is connected
82101
channelConnected.addListener(
83102
new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) );
84-
handshakeCompleted.addListener(
85-
new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
103+
}
86104

87-
return connectionInitialized;
105+
private void installHandshakeCompletedListeners( ChannelPromise handshakeCompleted,
106+
ChannelPromise connectionInitialized )
107+
{
108+
ChannelPipeline pipeline = handshakeCompleted.channel().pipeline();
109+
110+
// remove timeout handler from the pipeline once TLS and Bolt handshakes are completed. regular protocol
111+
// messages will flow next and we do not want to have read timeout for them
112+
handshakeCompleted.addListener( future -> pipeline.remove( ConnectTimeoutHandler.class ) );
113+
114+
// add listener that sends an INIT message. connection is now fully established. channel pipeline if fully
115+
// set to send/receive messages for a selected protocol version
116+
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
88117
}
89118

90119
private static Map<String,Value> tokenAsMap( AuthToken token )

driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ public class NettyChannelInitializer extends ChannelInitializer<Channel>
3939
{
4040
private final BoltServerAddress address;
4141
private final SecurityPlan securityPlan;
42+
private final int connectTimeoutMillis;
4243
private final Clock clock;
4344
private final Logging logging;
4445

45-
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock, Logging logging )
46+
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, int connectTimeoutMillis,
47+
Clock clock, Logging logging )
4648
{
4749
this.address = address;
4850
this.securityPlan = securityPlan;
51+
this.connectTimeoutMillis = connectTimeoutMillis;
4952
this.clock = clock;
5053
this.logging = logging;
5154
}
@@ -65,7 +68,9 @@ protected void initChannel( Channel channel )
6568
private SslHandler createSslHandler()
6669
{
6770
SSLEngine sslEngine = createSslEngine();
68-
return new SslHandler( sslEngine );
71+
SslHandler sslHandler = new SslHandler( sslEngine );
72+
sslHandler.setHandshakeTimeoutMillis( connectTimeoutMillis );
73+
return sslHandler;
6974
}
7075

7176
private SSLEngine createSslEngine()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.inbound;
20+
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.timeout.ReadTimeoutHandler;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
27+
28+
/**
29+
* Handler needed to limit amount of time connection performs TLS and Bolt handshakes.
30+
* It should only be used when connection is established and removed from the pipeline afterwards.
31+
* Otherwise it will make long running queries fail.
32+
*/
33+
public class ConnectTimeoutHandler extends ReadTimeoutHandler
34+
{
35+
private final long timeoutMillis;
36+
private boolean triggered;
37+
38+
public ConnectTimeoutHandler( long timeoutMillis )
39+
{
40+
super( timeoutMillis, TimeUnit.MILLISECONDS );
41+
this.timeoutMillis = timeoutMillis;
42+
}
43+
44+
@Override
45+
protected void readTimedOut( ChannelHandlerContext ctx )
46+
{
47+
if ( !triggered )
48+
{
49+
triggered = true;
50+
ctx.fireExceptionCaught( unableToConnectError() );
51+
}
52+
}
53+
54+
private ServiceUnavailableException unableToConnectError()
55+
{
56+
return new ServiceUnavailableException( "Unable to establish connection in " + timeoutMillis + "ms" );
57+
}
58+
}

driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,24 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
24+
import io.netty.channel.ChannelPipeline;
25+
import io.netty.handler.ssl.SslHandler;
2426
import org.junit.After;
2527
import org.junit.Before;
2628
import org.junit.Rule;
2729
import org.junit.Test;
30+
import org.junit.rules.RuleChain;
31+
import org.junit.rules.Timeout;
2832

33+
import java.io.IOException;
2934
import java.net.ConnectException;
35+
import java.net.ServerSocket;
3036
import java.util.concurrent.ExecutionException;
3137
import java.util.concurrent.TimeUnit;
3238

3339
import org.neo4j.driver.internal.BoltServerAddress;
3440
import org.neo4j.driver.internal.ConnectionSettings;
41+
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
3542
import org.neo4j.driver.internal.security.SecurityPlan;
3643
import org.neo4j.driver.internal.util.FakeClock;
3744
import org.neo4j.driver.v1.AuthToken;
@@ -42,7 +49,9 @@
4249

4350
import static org.hamcrest.Matchers.instanceOf;
4451
import static org.hamcrest.Matchers.startsWith;
52+
import static org.junit.Assert.assertEquals;
4553
import static org.junit.Assert.assertFalse;
54+
import static org.junit.Assert.assertNotNull;
4655
import static org.junit.Assert.assertNull;
4756
import static org.junit.Assert.assertThat;
4857
import static org.junit.Assert.assertTrue;
@@ -52,19 +61,20 @@
5261

5362
public class ChannelConnectorImplTest
5463
{
64+
private final TestNeo4j neo4j = new TestNeo4j();
5565
@Rule
56-
public final TestNeo4j neo4j = new TestNeo4j();
66+
public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 20 ) ).around( neo4j );
5767

5868
private Bootstrap bootstrap;
5969

6070
@Before
61-
public void setUp() throws Exception
71+
public void setUp()
6272
{
6373
bootstrap = BootstrapFactory.newBootstrap( 1 );
6474
}
6575

6676
@After
67-
public void tearDown() throws Exception
77+
public void tearDown()
6878
{
6979
if ( bootstrap != null )
7080
{
@@ -75,7 +85,7 @@ public void tearDown() throws Exception
7585
@Test
7686
public void shouldConnect() throws Exception
7787
{
78-
ChannelConnectorImpl connector = newConnector( neo4j.authToken() );
88+
ChannelConnector connector = newConnector( neo4j.authToken() );
7989

8090
ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap );
8191
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
@@ -85,10 +95,26 @@ public void shouldConnect() throws Exception
8595
assertTrue( channel.isActive() );
8696
}
8797

98+
@Test
99+
public void shouldSetupHandlers() throws Exception
100+
{
101+
ChannelConnector connector = newConnector( neo4j.authToken(), SecurityPlan.forAllCertificates(), 10_000 );
102+
103+
ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap );
104+
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
105+
106+
Channel channel = channelFuture.channel();
107+
ChannelPipeline pipeline = channel.pipeline();
108+
assertTrue( channel.isActive() );
109+
110+
assertNotNull( pipeline.get( SslHandler.class ) );
111+
assertNull( pipeline.get( ConnectTimeoutHandler.class ) );
112+
}
113+
88114
@Test
89115
public void shouldFailToConnectToWrongAddress() throws Exception
90116
{
91-
ChannelConnectorImpl connector = newConnector( neo4j.authToken() );
117+
ChannelConnector connector = newConnector( neo4j.authToken() );
92118

93119
ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "wrong-localhost" ), bootstrap );
94120
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
@@ -112,7 +138,7 @@ public void shouldFailToConnectToWrongAddress() throws Exception
112138
public void shouldFailToConnectWithWrongCredentials() throws Exception
113139
{
114140
AuthToken authToken = AuthTokens.basic( "neo4j", "wrong-password" );
115-
ChannelConnectorImpl connector = newConnector( authToken );
141+
ChannelConnector connector = newConnector( authToken );
116142

117143
ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap );
118144
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
@@ -131,10 +157,10 @@ public void shouldFailToConnectWithWrongCredentials() throws Exception
131157
assertFalse( channel.isActive() );
132158
}
133159

134-
@Test( timeout = 10000 )
160+
@Test
135161
public void shouldEnforceConnectTimeout() throws Exception
136162
{
137-
ChannelConnectorImpl connector = newConnector( neo4j.authToken(), 1000 );
163+
ChannelConnector connector = newConnector( neo4j.authToken(), 1000 );
138164

139165
// try connect to a non-routable ip address 10.0.0.0, it will never respond
140166
ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "10.0.0.0" ), bootstrap );
@@ -151,15 +177,55 @@ public void shouldEnforceConnectTimeout() throws Exception
151177
}
152178
}
153179

180+
@Test
181+
public void shouldFailWhenProtocolNegotiationTakesTooLong() throws Exception
182+
{
183+
// run without TLS so that Bolt handshake is the very first operation after connection is established
184+
testReadTimeoutOnConnect( SecurityPlan.insecure() );
185+
}
186+
187+
@Test
188+
public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception
189+
{
190+
// run with TLS so that TLS handshake is the very first operation after connection is established
191+
testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() );
192+
}
193+
194+
private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException
195+
{
196+
try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply
197+
{
198+
int timeoutMillis = 1_000;
199+
BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() );
200+
ChannelConnector connector = newConnector( neo4j.authToken(), securityPlan, timeoutMillis );
201+
202+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
203+
try
204+
{
205+
await( channelFuture );
206+
fail( "Exception expected" );
207+
}
208+
catch ( ServiceUnavailableException e )
209+
{
210+
assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" );
211+
}
212+
}
213+
}
214+
154215
private ChannelConnectorImpl newConnector( AuthToken authToken ) throws Exception
155216
{
156217
return newConnector( authToken, Integer.MAX_VALUE );
157218
}
158219

159220
private ChannelConnectorImpl newConnector( AuthToken authToken, int connectTimeoutMillis ) throws Exception
160221
{
161-
ConnectionSettings settings = new ConnectionSettings( authToken, 1000 );
162-
return new ChannelConnectorImpl( settings, SecurityPlan.forAllCertificates(), DEV_NULL_LOGGING,
163-
new FakeClock() );
222+
return newConnector( authToken, SecurityPlan.forAllCertificates(), connectTimeoutMillis );
223+
}
224+
225+
private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan securityPlan,
226+
int connectTimeoutMillis )
227+
{
228+
ConnectionSettings settings = new ConnectionSettings( authToken, connectTimeoutMillis );
229+
return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock() );
164230
}
165231
}

0 commit comments

Comments
 (0)