Skip to content

Commit 3eb4c9d

Browse files
authored
Merge pull request #447 from lutovich/1.5-handshake-timeout
Use connect timeout in Bolt and TLS handshake
2 parents 7ad54ec + a56490a commit 3eb4c9d

File tree

8 files changed

+395
-25
lines changed

8 files changed

+395
-25
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectorImpl.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelOption;
25+
import io.netty.channel.ChannelPipeline;
2526
import io.netty.channel.ChannelPromise;
2627

2728
import java.util.Map;
2829

2930
import org.neo4j.driver.internal.BoltServerAddress;
3031
import org.neo4j.driver.internal.ConnectionSettings;
32+
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
3133
import org.neo4j.driver.internal.security.InternalAuthToken;
3234
import org.neo4j.driver.internal.security.SecurityPlan;
3335
import org.neo4j.driver.internal.util.Clock;
@@ -71,20 +73,47 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan
7173
public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
7274
{
7375
bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis );
74-
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock, logging ) );
76+
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) );
7577

7678
ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
7779

7880
Channel channel = channelConnected.channel();
7981
ChannelPromise handshakeCompleted = channel.newPromise();
8082
ChannelPromise connectionInitialized = channel.newPromise();
8183

84+
installChannelConnectedListeners( address, channelConnected, handshakeCompleted );
85+
installHandshakeCompletedListeners( handshakeCompleted, connectionInitialized );
86+
87+
return connectionInitialized;
88+
}
89+
90+
private void installChannelConnectedListeners( BoltServerAddress address, ChannelFuture channelConnected,
91+
ChannelPromise handshakeCompleted )
92+
{
93+
ChannelPipeline pipeline = channelConnected.channel().pipeline();
94+
95+
// add timeout handler to the pipeline when channel is connected. it's needed to limit amount of time code
96+
// spends in TLS and Bolt handshakes. prevents infinite waiting when database does not respond
97+
channelConnected.addListener( future ->
98+
pipeline.addFirst( new ConnectTimeoutHandler( connectTimeoutMillis ) ) );
99+
100+
// add listener that sends Bolt handshake bytes when channel is connected
82101
channelConnected.addListener(
83102
new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) );
84-
handshakeCompleted.addListener(
85-
new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
103+
}
86104

87-
return connectionInitialized;
105+
private void installHandshakeCompletedListeners( ChannelPromise handshakeCompleted,
106+
ChannelPromise connectionInitialized )
107+
{
108+
ChannelPipeline pipeline = handshakeCompleted.channel().pipeline();
109+
110+
// remove timeout handler from the pipeline once TLS and Bolt handshakes are completed. regular protocol
111+
// messages will flow next and we do not want to have read timeout for them
112+
handshakeCompleted.addListener( future -> pipeline.remove( ConnectTimeoutHandler.class ) );
113+
114+
// add listener that sends an INIT message. connection is now fully established. channel pipeline if fully
115+
// set to send/receive messages for a selected protocol version
116+
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
88117
}
89118

90119
private static Map<String,Value> tokenAsMap( AuthToken token )

driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ public class NettyChannelInitializer extends ChannelInitializer<Channel>
3939
{
4040
private final BoltServerAddress address;
4141
private final SecurityPlan securityPlan;
42+
private final int connectTimeoutMillis;
4243
private final Clock clock;
4344
private final Logging logging;
4445

45-
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock, Logging logging )
46+
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, int connectTimeoutMillis,
47+
Clock clock, Logging logging )
4648
{
4749
this.address = address;
4850
this.securityPlan = securityPlan;
51+
this.connectTimeoutMillis = connectTimeoutMillis;
4952
this.clock = clock;
5053
this.logging = logging;
5154
}
@@ -65,7 +68,9 @@ protected void initChannel( Channel channel )
6568
private SslHandler createSslHandler()
6669
{
6770
SSLEngine sslEngine = createSslEngine();
68-
return new SslHandler( sslEngine );
71+
SslHandler sslHandler = new SslHandler( sslEngine );
72+
sslHandler.setHandshakeTimeoutMillis( connectTimeoutMillis );
73+
return sslHandler;
6974
}
7075

7176
private SSLEngine createSslEngine()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async.inbound;
20+
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.timeout.ReadTimeoutHandler;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
27+
28+
/**
29+
* Handler needed to limit amount of time connection performs TLS and Bolt handshakes.
30+
* It should only be used when connection is established and removed from the pipeline afterwards.
31+
* Otherwise it will make long running queries fail.
32+
*/
33+
public class ConnectTimeoutHandler extends ReadTimeoutHandler
34+
{
35+
private final long timeoutMillis;
36+
private boolean triggered;
37+
38+
public ConnectTimeoutHandler( long timeoutMillis )
39+
{
40+
super( timeoutMillis, TimeUnit.MILLISECONDS );
41+
this.timeoutMillis = timeoutMillis;
42+
}
43+
44+
@Override
45+
protected void readTimedOut( ChannelHandlerContext ctx )
46+
{
47+
if ( !triggered )
48+
{
49+
triggered = true;
50+
ctx.fireExceptionCaught( unableToConnectError() );
51+
}
52+
}
53+
54+
private ServiceUnavailableException unableToConnectError()
55+
{
56+
return new ServiceUnavailableException( "Unable to establish connection in " + timeoutMillis + "ms" );
57+
}
58+
}

driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,24 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
24+
import io.netty.channel.ChannelPipeline;
25+
import io.netty.handler.ssl.SslHandler;
2426
import org.junit.After;
2527
import org.junit.Before;
2628
import org.junit.Rule;
2729
import org.junit.Test;
30+
import org.junit.rules.RuleChain;
31+
import org.junit.rules.Timeout;
2832

33+
import java.io.IOException;
2934
import java.net.ConnectException;
35+
import java.net.ServerSocket;
3036
import java.util.concurrent.ExecutionException;
3137
import java.util.concurrent.TimeUnit;
3238

3339
import org.neo4j.driver.internal.BoltServerAddress;
3440
import org.neo4j.driver.internal.ConnectionSettings;
41+
import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler;
3542
import org.neo4j.driver.internal.security.SecurityPlan;
3643
import org.neo4j.driver.internal.util.FakeClock;
3744
import org.neo4j.driver.v1.AuthToken;
@@ -42,7 +49,9 @@
4249

4350
import static org.hamcrest.Matchers.instanceOf;
4451
import static org.hamcrest.Matchers.startsWith;
52+
import static org.junit.Assert.assertEquals;
4553
import static org.junit.Assert.assertFalse;
54+
import static org.junit.Assert.assertNotNull;
4655
import static org.junit.Assert.assertNull;
4756
import static org.junit.Assert.assertThat;
4857
import static org.junit.Assert.assertTrue;
@@ -52,19 +61,20 @@
5261

5362
public class ChannelConnectorImplTest
5463
{
64+
private final TestNeo4j neo4j = new TestNeo4j();
5565
@Rule
56-
public final TestNeo4j neo4j = new TestNeo4j();
66+
public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 20 ) ).around( neo4j );
5767

5868
private Bootstrap bootstrap;
5969

6070
@Before
61-
public void setUp() throws Exception
71+
public void setUp()
6272
{
6373
bootstrap = BootstrapFactory.newBootstrap( 1 );
6474
}
6575

6676
@After
67-
public void tearDown() throws Exception
77+
public void tearDown()
6878
{
6979
if ( bootstrap != null )
7080
{
@@ -75,7 +85,7 @@ public void tearDown() throws Exception
7585
@Test
7686
public void shouldConnect() throws Exception
7787
{
78-
ChannelConnectorImpl connector = newConnector( neo4j.authToken() );
88+
ChannelConnector connector = newConnector( neo4j.authToken() );
7989

8090
ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap );
8191
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
@@ -85,10 +95,26 @@ public void shouldConnect() throws Exception
8595
assertTrue( channel.isActive() );
8696
}
8797

98+
@Test
99+
public void shouldSetupHandlers() throws Exception
100+
{
101+
ChannelConnector connector = newConnector( neo4j.authToken(), SecurityPlan.forAllCertificates(), 10_000 );
102+
103+
ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap );
104+
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
105+
106+
Channel channel = channelFuture.channel();
107+
ChannelPipeline pipeline = channel.pipeline();
108+
assertTrue( channel.isActive() );
109+
110+
assertNotNull( pipeline.get( SslHandler.class ) );
111+
assertNull( pipeline.get( ConnectTimeoutHandler.class ) );
112+
}
113+
88114
@Test
89115
public void shouldFailToConnectToWrongAddress() throws Exception
90116
{
91-
ChannelConnectorImpl connector = newConnector( neo4j.authToken() );
117+
ChannelConnector connector = newConnector( neo4j.authToken() );
92118

93119
ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "wrong-localhost" ), bootstrap );
94120
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
@@ -112,7 +138,7 @@ public void shouldFailToConnectToWrongAddress() throws Exception
112138
public void shouldFailToConnectWithWrongCredentials() throws Exception
113139
{
114140
AuthToken authToken = AuthTokens.basic( "neo4j", "wrong-password" );
115-
ChannelConnectorImpl connector = newConnector( authToken );
141+
ChannelConnector connector = newConnector( authToken );
116142

117143
ChannelFuture channelFuture = connector.connect( neo4j.address(), bootstrap );
118144
assertTrue( channelFuture.await( 10, TimeUnit.SECONDS ) );
@@ -131,10 +157,10 @@ public void shouldFailToConnectWithWrongCredentials() throws Exception
131157
assertFalse( channel.isActive() );
132158
}
133159

134-
@Test( timeout = 10000 )
160+
@Test
135161
public void shouldEnforceConnectTimeout() throws Exception
136162
{
137-
ChannelConnectorImpl connector = newConnector( neo4j.authToken(), 1000 );
163+
ChannelConnector connector = newConnector( neo4j.authToken(), 1000 );
138164

139165
// try connect to a non-routable ip address 10.0.0.0, it will never respond
140166
ChannelFuture channelFuture = connector.connect( new BoltServerAddress( "10.0.0.0" ), bootstrap );
@@ -151,15 +177,55 @@ public void shouldEnforceConnectTimeout() throws Exception
151177
}
152178
}
153179

180+
@Test
181+
public void shouldFailWhenProtocolNegotiationTakesTooLong() throws Exception
182+
{
183+
// run without TLS so that Bolt handshake is the very first operation after connection is established
184+
testReadTimeoutOnConnect( SecurityPlan.insecure() );
185+
}
186+
187+
@Test
188+
public void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception
189+
{
190+
// run with TLS so that TLS handshake is the very first operation after connection is established
191+
testReadTimeoutOnConnect( SecurityPlan.forAllCertificates() );
192+
}
193+
194+
private void testReadTimeoutOnConnect( SecurityPlan securityPlan ) throws IOException
195+
{
196+
try ( ServerSocket server = new ServerSocket( 0 ) ) // server that accepts connections but does not reply
197+
{
198+
int timeoutMillis = 1_000;
199+
BoltServerAddress address = new BoltServerAddress( "localhost", server.getLocalPort() );
200+
ChannelConnector connector = newConnector( neo4j.authToken(), securityPlan, timeoutMillis );
201+
202+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
203+
try
204+
{
205+
await( channelFuture );
206+
fail( "Exception expected" );
207+
}
208+
catch ( ServiceUnavailableException e )
209+
{
210+
assertEquals( e.getMessage(), "Unable to establish connection in " + timeoutMillis + "ms" );
211+
}
212+
}
213+
}
214+
154215
private ChannelConnectorImpl newConnector( AuthToken authToken ) throws Exception
155216
{
156217
return newConnector( authToken, Integer.MAX_VALUE );
157218
}
158219

159220
private ChannelConnectorImpl newConnector( AuthToken authToken, int connectTimeoutMillis ) throws Exception
160221
{
161-
ConnectionSettings settings = new ConnectionSettings( authToken, 1000 );
162-
return new ChannelConnectorImpl( settings, SecurityPlan.forAllCertificates(), DEV_NULL_LOGGING,
163-
new FakeClock() );
222+
return newConnector( authToken, SecurityPlan.forAllCertificates(), connectTimeoutMillis );
223+
}
224+
225+
private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan securityPlan,
226+
int connectTimeoutMillis )
227+
{
228+
ConnectionSettings settings = new ConnectionSettings( authToken, connectTimeoutMillis );
229+
return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock() );
164230
}
165231
}

0 commit comments

Comments
 (0)