Skip to content

Commit 48becf7

Browse files
authored
Handle Netty channel connection failures (#1574)
* Handle Netty channel connection failures This update fixes a bug where connection acquisition may hang due to a failure to complete `ChannelPromise` of a `FailedChannel`. * Update following review * Update following review
1 parent 902c7ea commit 48becf7

26 files changed

+561
-650
lines changed

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.neo4j.driver.internal.bolt.api.exception.MinVersionAcquisitionException;
4848
import org.neo4j.driver.internal.bolt.basicimpl.messaging.v4.BoltProtocolV4;
4949
import org.neo4j.driver.internal.bolt.basicimpl.messaging.v51.BoltProtocolV51;
50+
import org.neo4j.driver.internal.bolt.basicimpl.util.FutureUtil;
5051

5152
public final class NettyBoltConnectionProvider implements BoltConnectionProvider {
5253
private final LoggingProvider logging;
@@ -149,6 +150,7 @@ public CompletionStage<BoltConnection> connect(
149150
})
150151
.handle((connection, throwable) -> {
151152
if (throwable != null) {
153+
throwable = FutureUtil.completionExceptionCause(throwable);
152154
log.log(System.Logger.Level.DEBUG, "Failed to establish BoltConnection " + address, throwable);
153155
throw new CompletionException(throwable);
154156
} else {

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java

Lines changed: 24 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919
import static java.util.Objects.requireNonNull;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.Channel;
2223
import io.netty.channel.ChannelFuture;
23-
import io.netty.channel.ChannelFutureListener;
2424
import io.netty.channel.ChannelOption;
25-
import io.netty.channel.ChannelPromise;
2625
import io.netty.channel.EventLoopGroup;
2726
import io.netty.channel.local.LocalAddress;
2827
import io.netty.channel.local.LocalChannel;
@@ -47,10 +46,10 @@
4746
import org.neo4j.driver.internal.bolt.basicimpl.async.NetworkConnection;
4847
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelConnectedListener;
4948
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelPipelineBuilderImpl;
50-
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.HandshakeCompletedListener;
5149
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.NettyChannelInitializer;
5250
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.NettyDomainNameResolverGroup;
5351
import org.neo4j.driver.internal.bolt.basicimpl.async.inbound.ConnectTimeoutHandler;
52+
import org.neo4j.driver.internal.bolt.basicimpl.messaging.BoltProtocol;
5453
import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection;
5554

5655
public final class NettyConnectionProvider implements ConnectionProvider {
@@ -69,7 +68,7 @@ public NettyConnectionProvider(
6968
LocalAddress localAddress,
7069
LoggingProvider logging) {
7170
this.eventLoopGroup = eventLoopGroup;
72-
this.clock = clock;
71+
this.clock = requireNonNull(clock);
7372
this.domainNameResolver = requireNonNull(domainNameResolver);
7473
this.addressResolverGroup = new NettyDomainNameResolverGroup(this.domainNameResolver);
7574
this.localAddress = localAddress;
@@ -111,42 +110,22 @@ public CompletionStage<Connection> acquireConnection(
111110
socketAddress = localAddress;
112111
}
113112

114-
var connectedFuture = bootstrap.connect(socketAddress);
115-
116-
var channel = connectedFuture.channel();
117-
var handshakeCompleted = channel.newPromise();
118-
var connectionInitialized = channel.newPromise();
119-
120-
installChannelConnectedListeners(address, connectedFuture, handshakeCompleted, connectTimeoutMillis);
121-
installHandshakeCompletedListeners(
122-
handshakeCompleted,
123-
connectionInitialized,
124-
address,
125-
routingContext,
126-
authMap,
127-
boltAgent,
128-
userAgent,
129-
latestAuthMillisFuture,
130-
notificationConfig);
131-
132-
var future = new CompletableFuture<Connection>();
133-
connectionInitialized.addListener((ChannelFutureListener) f -> {
134-
var throwable = f.cause();
135-
if (throwable != null) {
136-
future.completeExceptionally(throwable);
137-
} else {
138-
var connection = new NetworkConnection(channel, logging);
139-
future.complete(connection);
140-
}
141-
});
142-
return future;
113+
return installChannelConnectedListeners(address, bootstrap.connect(socketAddress), connectTimeoutMillis)
114+
.thenCompose(channel -> BoltProtocol.forChannel(channel)
115+
.initializeChannel(
116+
channel,
117+
requireNonNull(userAgent),
118+
requireNonNull(boltAgent),
119+
authMap,
120+
routingContext,
121+
notificationConfig,
122+
clock,
123+
latestAuthMillisFuture))
124+
.thenApply(channel -> new NetworkConnection(channel, logging));
143125
}
144126

145-
private void installChannelConnectedListeners(
146-
BoltServerAddress address,
147-
ChannelFuture channelConnected,
148-
ChannelPromise handshakeCompleted,
149-
int connectTimeoutMillis) {
127+
private CompletionStage<Channel> installChannelConnectedListeners(
128+
BoltServerAddress address, ChannelFuture channelConnected, int connectTimeoutMillis) {
150129
var pipeline = channelConnected.channel().pipeline();
151130

152131
// add timeout handler to the pipeline when channel is connected. it's needed to
@@ -156,42 +135,16 @@ private void installChannelConnectedListeners(
156135
channelConnected.addListener(future -> pipeline.addFirst(new ConnectTimeoutHandler(connectTimeoutMillis)));
157136

158137
// add listener that sends Bolt handshake bytes when channel is connected
138+
var handshakeCompleted = new CompletableFuture<Channel>();
159139
channelConnected.addListener(
160140
new ChannelConnectedListener(address, new ChannelPipelineBuilderImpl(), handshakeCompleted, logging));
161-
}
162-
163-
private void installHandshakeCompletedListeners(
164-
ChannelPromise handshakeCompleted,
165-
ChannelPromise connectionInitialized,
166-
BoltServerAddress address,
167-
RoutingContext routingContext,
168-
Map<String, Value> authMap,
169-
BoltAgent boltAgent,
170-
String userAgent,
171-
CompletableFuture<Long> latestAuthMillisFuture,
172-
NotificationConfig notificationConfig) {
173-
var pipeline = handshakeCompleted.channel().pipeline();
174-
175-
// remove timeout handler from the pipeline once TLS and Bolt handshakes are
176-
// completed. regular protocol
177-
// messages will flow next and we do not want to have read timeout for them
178-
handshakeCompleted.addListener(future -> {
179-
if (future.isSuccess()) {
180-
pipeline.remove(ConnectTimeoutHandler.class);
141+
return handshakeCompleted.whenComplete((channel, throwable) -> {
142+
if (throwable == null) {
143+
// remove timeout handler from the pipeline once TLS and Bolt handshakes are
144+
// completed. regular protocol
145+
// messages will flow next and we do not want to have read timeout for them
146+
channel.pipeline().remove(ConnectTimeoutHandler.class);
181147
}
182148
});
183-
184-
// add listener that sends an INIT message. connection is now fully established.
185-
// channel pipeline is fully
186-
// set to send/receive messages for a selected protocol version
187-
handshakeCompleted.addListener(new HandshakeCompletedListener(
188-
authMap,
189-
userAgent,
190-
boltAgent,
191-
routingContext,
192-
connectionInitialized,
193-
notificationConfig,
194-
this.clock,
195-
latestAuthMillisFuture));
196149
}
197150
}

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListener.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
import static java.lang.String.format;
2020
import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.BoltProtocolUtil.handshakeString;
2121

22+
import io.netty.channel.Channel;
2223
import io.netty.channel.ChannelFuture;
2324
import io.netty.channel.ChannelFutureListener;
24-
import io.netty.channel.ChannelPromise;
25+
import java.util.concurrent.CompletableFuture;
2526
import javax.net.ssl.SSLHandshakeException;
2627
import org.neo4j.driver.exceptions.SecurityException;
2728
import org.neo4j.driver.exceptions.ServiceUnavailableException;
@@ -32,30 +33,29 @@
3233
public class ChannelConnectedListener implements ChannelFutureListener {
3334
private final BoltServerAddress address;
3435
private final ChannelPipelineBuilder pipelineBuilder;
35-
private final ChannelPromise handshakeCompletedPromise;
36+
private final CompletableFuture<Channel> handshakeCompletedFuture;
3637
private final LoggingProvider logging;
3738

3839
public ChannelConnectedListener(
3940
BoltServerAddress address,
4041
ChannelPipelineBuilder pipelineBuilder,
41-
ChannelPromise handshakeCompletedPromise,
42+
CompletableFuture<Channel> handshakeCompletedFuture,
4243
LoggingProvider logging) {
4344
this.address = address;
4445
this.pipelineBuilder = pipelineBuilder;
45-
this.handshakeCompletedPromise = handshakeCompletedPromise;
46+
this.handshakeCompletedFuture = handshakeCompletedFuture;
4647
this.logging = logging;
4748
}
4849

4950
@Override
5051
public void operationComplete(ChannelFuture future) {
51-
var channel = future.channel();
52-
var log = new ChannelActivityLogger(channel, logging, getClass());
53-
5452
if (future.isSuccess()) {
53+
var channel = future.channel();
54+
var log = new ChannelActivityLogger(channel, logging, getClass());
5555
log.log(System.Logger.Level.TRACE, "Channel %s connected, initiating bolt handshake", channel);
5656

5757
var pipeline = channel.pipeline();
58-
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, logging));
58+
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedFuture, logging));
5959
log.log(System.Logger.Level.DEBUG, "C: [Bolt Handshake] %s", handshakeString());
6060
channel.writeAndFlush(BoltProtocolUtil.handshakeBuf()).addListener(f -> {
6161
if (!f.isSuccess()) {
@@ -66,11 +66,11 @@ public void operationComplete(ChannelFuture future) {
6666
error = new ServiceUnavailableException(
6767
String.format("Unable to write Bolt handshake to %s.", this.address), error);
6868
}
69-
this.handshakeCompletedPromise.setFailure(error);
69+
this.handshakeCompletedFuture.completeExceptionally(error);
7070
}
7171
});
7272
} else {
73-
handshakeCompletedPromise.setFailure(databaseUnavailableError(address, future.cause()));
73+
handshakeCompletedFuture.completeExceptionally(databaseUnavailableError(address, future.cause()));
7474
}
7575
}
7676

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListener.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandler.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919
import static org.neo4j.driver.internal.bolt.api.BoltProtocolVersion.isHttp;
2020

2121
import io.netty.buffer.ByteBuf;
22+
import io.netty.channel.Channel;
2223
import io.netty.channel.ChannelHandlerContext;
23-
import io.netty.channel.ChannelPromise;
2424
import io.netty.handler.codec.DecoderException;
2525
import io.netty.handler.codec.ReplayingDecoder;
2626
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
2728
import javax.net.ssl.SSLHandshakeException;
2829
import org.neo4j.driver.exceptions.ClientException;
2930
import org.neo4j.driver.exceptions.SecurityException;
@@ -39,17 +40,19 @@
3940

4041
public class HandshakeHandler extends ReplayingDecoder<Void> {
4142
private final ChannelPipelineBuilder pipelineBuilder;
42-
private final ChannelPromise handshakeCompletedPromise;
43+
private final CompletableFuture<Channel> handshakeCompletedFuture;
4344
private final LoggingProvider logging;
4445

4546
private boolean failed;
4647
private ChannelActivityLogger log;
4748
private ChannelErrorLogger errorLog;
4849

4950
public HandshakeHandler(
50-
ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise, LoggingProvider logging) {
51+
ChannelPipelineBuilder pipelineBuilder,
52+
CompletableFuture<Channel> handshakeCompletedFuture,
53+
LoggingProvider logging) {
5154
this.pipelineBuilder = pipelineBuilder;
52-
this.handshakeCompletedPromise = handshakeCompletedPromise;
55+
this.handshakeCompletedFuture = handshakeCompletedFuture;
5356
this.logging = logging;
5457
}
5558

@@ -114,7 +117,7 @@ private BoltProtocol protocolForVersion(BoltProtocolVersion version) {
114117
private void protocolSelected(BoltProtocolVersion version, MessageFormat messageFormat, ChannelHandlerContext ctx) {
115118
ChannelAttributes.setProtocolVersion(ctx.channel(), version);
116119
pipelineBuilder.build(messageFormat, ctx.pipeline(), logging);
117-
handshakeCompletedPromise.setSuccess();
120+
handshakeCompletedFuture.complete(ctx.channel());
118121
}
119122

120123
private void handleUnknownSuggestedProtocolVersion(BoltProtocolVersion version, ChannelHandlerContext ctx) {
@@ -128,7 +131,7 @@ private void handleUnknownSuggestedProtocolVersion(BoltProtocolVersion version,
128131
}
129132

130133
private void fail(ChannelHandlerContext ctx, Throwable error) {
131-
ctx.close().addListener(future -> handshakeCompletedPromise.tryFailure(error));
134+
ctx.close().addListener(future -> handshakeCompletedFuture.completeExceptionally(error));
132135
}
133136

134137
private static Throwable protocolNoSupportedByServerError() {

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/BoltProtocol.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.protocolVersion;
2020

2121
import io.netty.channel.Channel;
22-
import io.netty.channel.ChannelPromise;
2322
import java.time.Clock;
2423
import java.time.Duration;
2524
import java.util.Map;
@@ -59,12 +58,12 @@
5958
public interface BoltProtocol {
6059
MessageFormat createMessageFormat();
6160

62-
void initializeChannel(
61+
CompletionStage<Channel> initializeChannel(
62+
Channel channel,
6363
String userAgent,
6464
BoltAgent boltAgent,
6565
Map<String, Value> authMap,
6666
RoutingContext routingContext,
67-
ChannelPromise channelInitializedPromise,
6867
NotificationConfig notificationConfig,
6968
Clock clock,
7069
CompletableFuture<Long> latestAuthMillisFuture);

0 commit comments

Comments
 (0)