Skip to content

Commit 62b25a9

Browse files
authored
Merge pull request #413 from lutovich/1.5-close-channel-on-init-failure
Make sure network channel is closed on handshake or INIT failure
2 parents 9c9e61d + 8b588f7 commit 62b25a9

14 files changed

+457
-66
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.neo4j.driver.internal.async.AsyncConnectorImpl;
2929
import org.neo4j.driver.internal.async.BootstrapFactory;
3030
import org.neo4j.driver.internal.async.Futures;
31-
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
3231
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
3332
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
3433
import org.neo4j.driver.internal.cluster.RoutingContext;
@@ -107,16 +106,13 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
107106
Bootstrap bootstrap, Config config )
108107
{
109108
Clock clock = createClock();
110-
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
111-
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
112-
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
113-
activeChannelTracker, config.logging(), clock );
109+
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
110+
AsyncConnectorImpl connector = new AsyncConnectorImpl( settings, securityPlan, config.logging(), clock );
114111
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
115112
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
116113
config.maxConnectionPoolSize(),
117114
config.connectionAcquisitionTimeoutMillis() );
118-
return new AsyncConnectionPoolImpl( connector, bootstrap, activeChannelTracker, poolSettings, config.logging(),
119-
clock );
115+
return new AsyncConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
120116
}
121117

122118
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,

driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnectorImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelOption;
2525
import io.netty.channel.ChannelPromise;
26-
import io.netty.channel.pool.ChannelPoolHandler;
2726

2827
import java.util.Map;
2928

@@ -46,18 +45,16 @@ public class AsyncConnectorImpl implements AsyncConnector
4645
private final Map<String,Value> authToken;
4746
private final SecurityPlan securityPlan;
4847
private final int connectTimeoutMillis;
49-
private final ChannelPoolHandler channelPoolHandler;
5048
private final Logging logging;
5149
private final Clock clock;
5250

53-
public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
54-
ChannelPoolHandler channelPoolHandler, Logging logging, Clock clock )
51+
public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
52+
Clock clock )
5553
{
5654
this.userAgent = connectionSettings.userAgent();
5755
this.authToken = tokenAsMap( connectionSettings.authToken() );
5856
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
5957
this.securityPlan = requireNonNull( securityPlan );
60-
this.channelPoolHandler = requireNonNull( channelPoolHandler );
6158
this.logging = requireNonNull( logging );
6259
this.clock = requireNonNull( clock );
6360
}
@@ -66,7 +63,7 @@ public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan s
6663
public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
6764
{
6865
bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis );
69-
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, channelPoolHandler, clock ) );
66+
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock ) );
7067

7168
ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
7269

driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,11 @@ public void operationComplete( ChannelFuture future )
5555
channel.pipeline().addLast( new HandshakeResponseHandler( handshakeCompletedPromise, logging ) );
5656
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
5757

58-
handshakeFuture.addListener( new ChannelFutureListener()
58+
handshakeFuture.addListener( channelFuture ->
5959
{
60-
@Override
61-
public void operationComplete( ChannelFuture future ) throws Exception
60+
if ( !channelFuture.isSuccess() )
6261
{
63-
if ( !future.isSuccess() )
64-
{
65-
handshakeCompletedPromise.setFailure( future.cause() );
66-
}
62+
handshakeCompletedPromise.setFailure( channelFuture.cause() );
6763
}
6864
} );
6965
}

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ public HandshakeResponseHandler( ChannelPromise handshakeCompletedPromise, Loggi
5454
}
5555

5656
@Override
57-
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception
57+
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause )
5858
{
5959
fail( ctx, cause );
6060
}
6161

6262
@Override
63-
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception
63+
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
6464
{
6565
int serverSuggestedVersion = in.readInt();
6666
log.debug( "Server suggested protocol version: %s", serverSuggestedVersion );
@@ -99,8 +99,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
9999

100100
private void fail( ChannelHandlerContext ctx, Throwable error )
101101
{
102-
ctx.close();
103-
handshakeCompletedPromise.setFailure( error );
102+
ctx.close().addListener( future -> handshakeCompletedPromise.setFailure( error ) );
104103
}
105104

106105
private static Throwable protocolNoSupportedByServerError()

driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelInitializer;
23-
import io.netty.channel.pool.ChannelPoolHandler;
2423
import io.netty.handler.ssl.SslHandler;
2524

2625
import javax.net.ssl.SSLContext;
@@ -40,20 +39,17 @@ public class NettyChannelInitializer extends ChannelInitializer<Channel>
4039
{
4140
private final BoltServerAddress address;
4241
private final SecurityPlan securityPlan;
43-
private final ChannelPoolHandler channelPoolHandler;
4442
private final Clock clock;
4543

46-
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan,
47-
ChannelPoolHandler channelPoolHandler, Clock clock )
44+
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock )
4845
{
4946
this.address = address;
5047
this.securityPlan = securityPlan;
51-
this.channelPoolHandler = channelPoolHandler;
5248
this.clock = clock;
5349
}
5450

5551
@Override
56-
protected void initChannel( Channel channel ) throws Exception
52+
protected void initChannel( Channel channel )
5753
{
5854
if ( securityPlan.requiresEncryption() )
5955
{
@@ -62,8 +58,6 @@ protected void initChannel( Channel channel ) throws Exception
6258
}
6359

6460
updateChannelAttributes( channel );
65-
66-
channelPoolHandler.channelCreated( channel );
6761
}
6862

6963
private SslHandler createSslHandler()

driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ public class AsyncConnectionPoolImpl implements AsyncConnectionPool
5252
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
5353
private final AtomicBoolean closed = new AtomicBoolean();
5454

55-
public AsyncConnectionPoolImpl( AsyncConnector connector, Bootstrap bootstrap,
56-
ActiveChannelTracker activeChannelTracker, PoolSettings settings, Logging logging, Clock clock )
55+
public AsyncConnectionPoolImpl( AsyncConnector connector, Bootstrap bootstrap, PoolSettings settings,
56+
Logging logging, Clock clock )
5757
{
5858
this.connector = connector;
5959
this.bootstrap = bootstrap;
60-
this.activeChannelTracker = activeChannelTracker;
60+
this.activeChannelTracker = new ActiveChannelTracker( logging );
6161
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock );
6262
this.settings = settings;
6363
this.clock = clock;

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ public NettyChannelPool( BoltServerAddress address, AsyncConnector connector, Bo
5757
@Override
5858
protected ChannelFuture connectChannel( Bootstrap bootstrap )
5959
{
60-
return connector.connect( address, bootstrap );
60+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
61+
channelFuture.addListener( future ->
62+
{
63+
if ( future.isSuccess() )
64+
{
65+
// notify pool handler about a successful connection
66+
handler().channelCreated( channelFuture.channel() );
67+
}
68+
} );
69+
return channelFuture;
6170
}
6271
}

driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.neo4j.driver.internal.handlers;
2020

2121
import io.netty.channel.Channel;
22-
import io.netty.channel.ChannelFuture;
23-
import io.netty.channel.ChannelFutureListener;
2422
import io.netty.channel.ChannelPipeline;
2523
import io.netty.channel.ChannelPromise;
2624

@@ -32,7 +30,6 @@
3230
import org.neo4j.driver.v1.Value;
3331

3432
import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion;
35-
import static org.neo4j.driver.internal.util.ServerVersion.version;
3633

3734
public class AsyncInitResponseHandler implements ResponseHandler
3835
{
@@ -50,14 +47,9 @@ public void onSuccess( Map<String,Value> metadata )
5047
{
5148
try
5249
{
53-
Value versionValue = metadata.get( "server" );
54-
if ( versionValue != null )
55-
{
56-
String versionString = versionValue.asString();
57-
ServerVersion version = version( versionString );
58-
setServerVersion( channel, version );
59-
updatePipelineIfNeeded( version, channel.pipeline() );
60-
}
50+
ServerVersion serverVersion = extractServerVersion( metadata );
51+
setServerVersion( channel, serverVersion );
52+
updatePipelineIfNeeded( serverVersion, channel.pipeline() );
6153
connectionInitializedPromise.setSuccess();
6254
}
6355
catch ( Throwable error )
@@ -68,16 +60,9 @@ public void onSuccess( Map<String,Value> metadata )
6860
}
6961

7062
@Override
71-
public void onFailure( final Throwable error )
63+
public void onFailure( Throwable error )
7264
{
73-
channel.close().addListener( new ChannelFutureListener()
74-
{
75-
@Override
76-
public void operationComplete( ChannelFuture future ) throws Exception
77-
{
78-
connectionInitializedPromise.setFailure( error );
79-
}
80-
} );
65+
channel.close().addListener( future -> connectionInitializedPromise.setFailure( error ) );
8166
}
8267

8368
@Override
@@ -86,6 +71,13 @@ public void onRecord( Value[] fields )
8671
throw new UnsupportedOperationException();
8772
}
8873

74+
private static ServerVersion extractServerVersion( Map<String,Value> metadata )
75+
{
76+
Value versionValue = metadata.get( "server" );
77+
boolean versionAbsent = versionValue == null || versionValue.isNull();
78+
return versionAbsent ? ServerVersion.v3_0_0 : ServerVersion.version( versionValue.asString() );
79+
}
80+
8981
private static void updatePipelineIfNeeded( ServerVersion serverVersion, ChannelPipeline pipeline )
9082
{
9183
if ( serverVersion.lessThan( ServerVersion.v3_2_0 ) )

driver/src/test/java/org/neo4j/driver/internal/async/AsyncConnectorImplTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
24-
import io.netty.channel.pool.ChannelPoolHandler;
2524
import org.junit.After;
2625
import org.junit.Before;
2726
import org.junit.Rule;
@@ -48,7 +47,6 @@
4847
import static org.junit.Assert.assertThat;
4948
import static org.junit.Assert.assertTrue;
5049
import static org.junit.Assert.fail;
51-
import static org.mockito.Mockito.mock;
5250
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5351
import static org.neo4j.driver.v1.util.TestUtil.await;
5452

@@ -70,7 +68,7 @@ public void tearDown() throws Exception
7068
{
7169
if ( bootstrap != null )
7270
{
73-
bootstrap.config().group().shutdownGracefully();
71+
bootstrap.config().group().shutdownGracefully().syncUninterruptibly();
7472
}
7573
}
7674

@@ -161,7 +159,6 @@ private AsyncConnectorImpl newConnector( AuthToken authToken ) throws Exception
161159
private AsyncConnectorImpl newConnector( AuthToken authToken, int connectTimeoutMillis ) throws Exception
162160
{
163161
ConnectionSettings settings = new ConnectionSettings( authToken, 1000 );
164-
return new AsyncConnectorImpl( settings, SecurityPlan.forAllCertificates(),
165-
mock( ChannelPoolHandler.class ), DEV_NULL_LOGGING, new FakeClock() );
162+
return new AsyncConnectorImpl( settings, SecurityPlan.forAllCertificates(), DEV_NULL_LOGGING, new FakeClock() );
166163
}
167164
}

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public class HandshakeResponseHandlerTest
5151
private final EmbeddedChannel channel = new EmbeddedChannel();
5252

5353
@Before
54-
public void setUp() throws Exception
54+
public void setUp()
5555
{
5656
setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
5757
}
5858

5959
@After
60-
public void tearDown() throws Exception
60+
public void tearDown()
6161
{
6262
channel.close();
6363
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.async;
20+
21+
import io.netty.channel.embedded.EmbeddedChannel;
22+
import io.netty.handler.ssl.SslHandler;
23+
import org.junit.Test;
24+
25+
import org.neo4j.driver.internal.security.SecurityPlan;
26+
import org.neo4j.driver.internal.util.Clock;
27+
import org.neo4j.driver.internal.util.FakeClock;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertNotNull;
31+
import static org.junit.Assert.assertNull;
32+
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.when;
34+
import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp;
35+
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
36+
import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
37+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
38+
39+
public class NettyChannelInitializerTest
40+
{
41+
@Test
42+
public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception
43+
{
44+
SecurityPlan security = SecurityPlan.forAllCertificates();
45+
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );
46+
47+
EmbeddedChannel channel = new EmbeddedChannel();
48+
initializer.initChannel( channel );
49+
50+
assertNotNull( channel.pipeline().get( SslHandler.class ) );
51+
}
52+
53+
@Test
54+
public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption()
55+
{
56+
SecurityPlan security = SecurityPlan.insecure();
57+
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );
58+
59+
EmbeddedChannel channel = new EmbeddedChannel();
60+
initializer.initChannel( channel );
61+
62+
assertNull( channel.pipeline().get( SslHandler.class ) );
63+
}
64+
65+
@Test
66+
public void shouldUpdateChannelAttributes()
67+
{
68+
Clock clock = mock( Clock.class );
69+
when( clock.millis() ).thenReturn( 42L );
70+
SecurityPlan security = SecurityPlan.insecure();
71+
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock );
72+
73+
EmbeddedChannel channel = new EmbeddedChannel();
74+
initializer.initChannel( channel );
75+
76+
assertEquals( LOCAL_DEFAULT, serverAddress( channel ) );
77+
assertEquals( 42L, creationTimestamp( channel ) );
78+
assertNotNull( messageDispatcher( channel ) );
79+
}
80+
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,10 @@ private AsyncConnectionPoolImpl newPool() throws Exception
167167
{
168168
FakeClock clock = new FakeClock();
169169
ConnectionSettings connectionSettings = new ConnectionSettings( neo4j.authToken(), 5000 );
170-
ActiveChannelTracker poolHandler = new ActiveChannelTracker( DEV_NULL_LOGGING );
171170
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, SecurityPlan.forAllCertificates(),
172-
poolHandler, DEV_NULL_LOGGING, clock );
171+
DEV_NULL_LOGGING, clock );
173172
PoolSettings poolSettings = new PoolSettings( 5, -1, -1, 10, 5000 );
174173
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
175-
return new AsyncConnectionPoolImpl( connector, bootstrap, poolHandler, poolSettings, DEV_NULL_LOGGING, clock );
174+
return new AsyncConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_LOGGING, clock );
176175
}
177176
}

0 commit comments

Comments
 (0)