Skip to content

Commit 8b588f7

Browse files
committed
Treat channel as broken when connection failed
Channel connection executed by `AsyncConnectorImpl` is a three step process: 1) connect netty channel 2) perform bolt handshake 3) send INIT message When either of these steps fail we should close (possibly open) network channel from step (1) and not track this channel in the pool or in the set of active channels. This commit makes sure pool only notifies `ActiveChannelTracker` about created channel when all three steps are successful. Also added various unit tests around connection establishment.
1 parent 65d1d42 commit 8b588f7

13 files changed

+382
-84
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.neo4j.driver.internal.async.AsyncConnectorImpl;
2929
import org.neo4j.driver.internal.async.BootstrapFactory;
3030
import org.neo4j.driver.internal.async.Futures;
31-
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
3231
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
3332
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
3433
import org.neo4j.driver.internal.cluster.RoutingContext;
@@ -107,16 +106,13 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
107106
Bootstrap bootstrap, Config config )
108107
{
109108
Clock clock = createClock();
110-
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
111-
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
112-
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
113-
activeChannelTracker, config.logging(), clock );
109+
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
110+
AsyncConnectorImpl connector = new AsyncConnectorImpl( settings, securityPlan, config.logging(), clock );
114111
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
115112
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
116113
config.maxConnectionPoolSize(),
117114
config.connectionAcquisitionTimeoutMillis() );
118-
return new AsyncConnectionPoolImpl( connector, bootstrap, activeChannelTracker, poolSettings, config.logging(),
119-
clock );
115+
return new AsyncConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
120116
}
121117

122118
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,

driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnectorImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelOption;
2525
import io.netty.channel.ChannelPromise;
26-
import io.netty.channel.pool.ChannelPoolHandler;
2726

2827
import java.util.Map;
2928

@@ -46,18 +45,16 @@ public class AsyncConnectorImpl implements AsyncConnector
4645
private final Map<String,Value> authToken;
4746
private final SecurityPlan securityPlan;
4847
private final int connectTimeoutMillis;
49-
private final ChannelPoolHandler channelPoolHandler;
5048
private final Logging logging;
5149
private final Clock clock;
5250

53-
public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
54-
ChannelPoolHandler channelPoolHandler, Logging logging, Clock clock )
51+
public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
52+
Clock clock )
5553
{
5654
this.userAgent = connectionSettings.userAgent();
5755
this.authToken = tokenAsMap( connectionSettings.authToken() );
5856
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
5957
this.securityPlan = requireNonNull( securityPlan );
60-
this.channelPoolHandler = requireNonNull( channelPoolHandler );
6158
this.logging = requireNonNull( logging );
6259
this.clock = requireNonNull( clock );
6360
}
@@ -66,7 +63,7 @@ public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan s
6663
public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
6764
{
6865
bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis );
69-
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, channelPoolHandler, clock ) );
66+
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, clock ) );
7067

7168
ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
7269

driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
9999

100100
private void fail( ChannelHandlerContext ctx, Throwable error )
101101
{
102-
ctx.close();
103-
handshakeCompletedPromise.setFailure( error );
102+
ctx.close().addListener( future -> handshakeCompletedPromise.setFailure( error ) );
104103
}
105104

106105
private static Throwable protocolNoSupportedByServerError()

driver/src/main/java/org/neo4j/driver/internal/async/NettyChannelInitializer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelInitializer;
23-
import io.netty.channel.pool.ChannelPoolHandler;
2423
import io.netty.handler.ssl.SslHandler;
2524

2625
import javax.net.ssl.SSLContext;
@@ -40,20 +39,17 @@ public class NettyChannelInitializer extends ChannelInitializer<Channel>
4039
{
4140
private final BoltServerAddress address;
4241
private final SecurityPlan securityPlan;
43-
private final ChannelPoolHandler channelPoolHandler;
4442
private final Clock clock;
4543

46-
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan,
47-
ChannelPoolHandler channelPoolHandler, Clock clock )
44+
public NettyChannelInitializer( BoltServerAddress address, SecurityPlan securityPlan, Clock clock )
4845
{
4946
this.address = address;
5047
this.securityPlan = securityPlan;
51-
this.channelPoolHandler = channelPoolHandler;
5248
this.clock = clock;
5349
}
5450

5551
@Override
56-
protected void initChannel( Channel channel ) throws Exception
52+
protected void initChannel( Channel channel )
5753
{
5854
if ( securityPlan.requiresEncryption() )
5955
{
@@ -62,8 +58,6 @@ protected void initChannel( Channel channel ) throws Exception
6258
}
6359

6460
updateChannelAttributes( channel );
65-
66-
channelPoolHandler.channelCreated( channel );
6761
}
6862

6963
private SslHandler createSslHandler()

driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ public class AsyncConnectionPoolImpl implements AsyncConnectionPool
5252
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
5353
private final AtomicBoolean closed = new AtomicBoolean();
5454

55-
public AsyncConnectionPoolImpl( AsyncConnector connector, Bootstrap bootstrap,
56-
ActiveChannelTracker activeChannelTracker, PoolSettings settings, Logging logging, Clock clock )
55+
public AsyncConnectionPoolImpl( AsyncConnector connector, Bootstrap bootstrap, PoolSettings settings,
56+
Logging logging, Clock clock )
5757
{
5858
this.connector = connector;
5959
this.bootstrap = bootstrap;
60-
this.activeChannelTracker = activeChannelTracker;
60+
this.activeChannelTracker = new ActiveChannelTracker( logging );
6161
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock );
6262
this.settings = settings;
6363
this.clock = clock;

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ public NettyChannelPool( BoltServerAddress address, AsyncConnector connector, Bo
5757
@Override
5858
protected ChannelFuture connectChannel( Bootstrap bootstrap )
5959
{
60-
return connector.connect( address, bootstrap );
60+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
61+
channelFuture.addListener( future ->
62+
{
63+
if ( future.isSuccess() )
64+
{
65+
// notify pool handler about a successful connection
66+
handler().channelCreated( channelFuture.channel() );
67+
}
68+
} );
69+
return channelFuture;
6170
}
6271
}

driver/src/main/java/org/neo4j/driver/internal/handlers/AsyncInitResponseHandler.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.neo4j.driver.internal.handlers;
2020

2121
import io.netty.channel.Channel;
22-
import io.netty.channel.ChannelFuture;
23-
import io.netty.channel.ChannelFutureListener;
2422
import io.netty.channel.ChannelPipeline;
2523
import io.netty.channel.ChannelPromise;
2624

@@ -32,7 +30,6 @@
3230
import org.neo4j.driver.v1.Value;
3331

3432
import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion;
35-
import static org.neo4j.driver.internal.util.ServerVersion.version;
3633

3734
public class AsyncInitResponseHandler implements ResponseHandler
3835
{
@@ -50,14 +47,9 @@ public void onSuccess( Map<String,Value> metadata )
5047
{
5148
try
5249
{
53-
Value versionValue = metadata.get( "server" );
54-
if ( versionValue != null )
55-
{
56-
String versionString = versionValue.asString();
57-
ServerVersion version = version( versionString );
58-
setServerVersion( channel, version );
59-
updatePipelineIfNeeded( version, channel.pipeline() );
60-
}
50+
ServerVersion serverVersion = extractServerVersion( metadata );
51+
setServerVersion( channel, serverVersion );
52+
updatePipelineIfNeeded( serverVersion, channel.pipeline() );
6153
connectionInitializedPromise.setSuccess();
6254
}
6355
catch ( Throwable error )
@@ -68,16 +60,9 @@ public void onSuccess( Map<String,Value> metadata )
6860
}
6961

7062
@Override
71-
public void onFailure( final Throwable error )
63+
public void onFailure( Throwable error )
7264
{
73-
channel.close().addListener( new ChannelFutureListener()
74-
{
75-
@Override
76-
public void operationComplete( ChannelFuture future ) throws Exception
77-
{
78-
connectionInitializedPromise.setFailure( error );
79-
}
80-
} );
65+
channel.close().addListener( future -> connectionInitializedPromise.setFailure( error ) );
8166
}
8267

8368
@Override
@@ -86,6 +71,13 @@ public void onRecord( Value[] fields )
8671
throw new UnsupportedOperationException();
8772
}
8873

74+
private static ServerVersion extractServerVersion( Map<String,Value> metadata )
75+
{
76+
Value versionValue = metadata.get( "server" );
77+
boolean versionAbsent = versionValue == null || versionValue.isNull();
78+
return versionAbsent ? ServerVersion.v3_0_0 : ServerVersion.version( versionValue.asString() );
79+
}
80+
8981
private static void updatePipelineIfNeeded( ServerVersion serverVersion, ChannelPipeline pipeline )
9082
{
9183
if ( serverVersion.lessThan( ServerVersion.v3_2_0 ) )

driver/src/test/java/org/neo4j/driver/internal/async/AsyncConnectorImplTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
24-
import io.netty.channel.pool.ChannelPoolHandler;
2524
import org.junit.After;
2625
import org.junit.Before;
2726
import org.junit.Rule;
@@ -48,7 +47,6 @@
4847
import static org.junit.Assert.assertThat;
4948
import static org.junit.Assert.assertTrue;
5049
import static org.junit.Assert.fail;
51-
import static org.mockito.Mockito.mock;
5250
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5351
import static org.neo4j.driver.v1.util.TestUtil.await;
5452

@@ -70,7 +68,7 @@ public void tearDown() throws Exception
7068
{
7169
if ( bootstrap != null )
7270
{
73-
bootstrap.config().group().shutdownGracefully();
71+
bootstrap.config().group().shutdownGracefully().syncUninterruptibly();
7472
}
7573
}
7674

@@ -161,7 +159,6 @@ private AsyncConnectorImpl newConnector( AuthToken authToken ) throws Exception
161159
private AsyncConnectorImpl newConnector( AuthToken authToken, int connectTimeoutMillis ) throws Exception
162160
{
163161
ConnectionSettings settings = new ConnectionSettings( authToken, 1000 );
164-
return new AsyncConnectorImpl( settings, SecurityPlan.forAllCertificates(),
165-
mock( ChannelPoolHandler.class ), DEV_NULL_LOGGING, new FakeClock() );
162+
return new AsyncConnectorImpl( settings, SecurityPlan.forAllCertificates(), DEV_NULL_LOGGING, new FakeClock() );
166163
}
167164
}

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ public class HandshakeResponseHandlerTest
5151
private final EmbeddedChannel channel = new EmbeddedChannel();
5252

5353
@Before
54-
public void setUp() throws Exception
54+
public void setUp()
5555
{
5656
setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
5757
}
5858

5959
@After
60-
public void tearDown() throws Exception
60+
public void tearDown()
6161
{
6262
channel.close();
6363
}

driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.embedded.EmbeddedChannel;
22-
import io.netty.channel.pool.ChannelPoolHandler;
2322
import io.netty.handler.ssl.SslHandler;
2423
import org.junit.Test;
2524

@@ -31,21 +30,19 @@
3130
import static org.junit.Assert.assertNotNull;
3231
import static org.junit.Assert.assertNull;
3332
import static org.mockito.Mockito.mock;
34-
import static org.mockito.Mockito.verify;
3533
import static org.mockito.Mockito.when;
36-
import static org.neo4j.driver.internal.async.ChannelAttributes.address;
3734
import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp;
3835
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
36+
import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
3937
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
4038

4139
public class NettyChannelInitializerTest
4240
{
4341
@Test
4442
public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception
4543
{
46-
SecurityPlan securityPlan = SecurityPlan.forAllCertificates();
47-
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, securityPlan,
48-
mock( ChannelPoolHandler.class ), new FakeClock() );
44+
SecurityPlan security = SecurityPlan.forAllCertificates();
45+
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );
4946

5047
EmbeddedChannel channel = new EmbeddedChannel();
5148
initializer.initChannel( channel );
@@ -54,11 +51,10 @@ public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception
5451
}
5552

5653
@Test
57-
public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption() throws Exception
54+
public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption()
5855
{
59-
SecurityPlan securityPlan = SecurityPlan.insecure();
60-
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, securityPlan,
61-
mock( ChannelPoolHandler.class ), new FakeClock() );
56+
SecurityPlan security = SecurityPlan.insecure();
57+
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );
6258

6359
EmbeddedChannel channel = new EmbeddedChannel();
6460
initializer.initChannel( channel );
@@ -67,31 +63,18 @@ public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption() throws Exceptio
6763
}
6864

6965
@Test
70-
public void shouldUpdateChannelAttributes() throws Exception
66+
public void shouldUpdateChannelAttributes()
7167
{
7268
Clock clock = mock( Clock.class );
7369
when( clock.millis() ).thenReturn( 42L );
74-
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, SecurityPlan.insecure(),
75-
mock( ChannelPoolHandler.class ), clock );
70+
SecurityPlan security = SecurityPlan.insecure();
71+
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock );
7672

7773
EmbeddedChannel channel = new EmbeddedChannel();
7874
initializer.initChannel( channel );
7975

80-
assertEquals( LOCAL_DEFAULT, address( channel ) );
76+
assertEquals( LOCAL_DEFAULT, serverAddress( channel ) );
8177
assertEquals( 42L, creationTimestamp( channel ) );
8278
assertNotNull( messageDispatcher( channel ) );
8379
}
84-
85-
@Test
86-
public void shouldNotifyPoolHandlerAboutCreatedConnection() throws Exception
87-
{
88-
ChannelPoolHandler poolHandler = mock( ChannelPoolHandler.class );
89-
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, SecurityPlan.insecure(),
90-
poolHandler, new FakeClock() );
91-
92-
EmbeddedChannel channel = new EmbeddedChannel();
93-
initializer.initChannel( channel );
94-
95-
verify( poolHandler ).channelCreated( channel );
96-
}
9780
}

driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,10 @@ private AsyncConnectionPoolImpl newPool() throws Exception
167167
{
168168
FakeClock clock = new FakeClock();
169169
ConnectionSettings connectionSettings = new ConnectionSettings( neo4j.authToken(), 5000 );
170-
ActiveChannelTracker poolHandler = new ActiveChannelTracker( DEV_NULL_LOGGING );
171170
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, SecurityPlan.forAllCertificates(),
172-
poolHandler, DEV_NULL_LOGGING, clock );
171+
DEV_NULL_LOGGING, clock );
173172
PoolSettings poolSettings = new PoolSettings( 5, -1, -1, 10, 5000 );
174173
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
175-
return new AsyncConnectionPoolImpl( connector, bootstrap, poolHandler, poolSettings, DEV_NULL_LOGGING, clock );
174+
return new AsyncConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_LOGGING, clock );
176175
}
177176
}

0 commit comments

Comments
 (0)