Skip to content

Handle Netty channel connection failures #1574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.neo4j.driver.internal.bolt.api.exception.MinVersionAcquisitionException;
import org.neo4j.driver.internal.bolt.basicimpl.messaging.v4.BoltProtocolV4;
import org.neo4j.driver.internal.bolt.basicimpl.messaging.v51.BoltProtocolV51;
import org.neo4j.driver.internal.bolt.basicimpl.util.FutureUtil;

public final class NettyBoltConnectionProvider implements BoltConnectionProvider {
private final LoggingProvider logging;
Expand Down Expand Up @@ -149,6 +150,7 @@ public CompletionStage<BoltConnection> connect(
})
.handle((connection, throwable) -> {
if (throwable != null) {
throwable = FutureUtil.completionExceptionCause(throwable);
log.log(System.Logger.Level.DEBUG, "Failed to establish BoltConnection " + address, throwable);
throw new CompletionException(throwable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import static java.util.Objects.requireNonNull;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
Expand All @@ -47,10 +46,10 @@
import org.neo4j.driver.internal.bolt.basicimpl.async.NetworkConnection;
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelConnectedListener;
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelPipelineBuilderImpl;
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.HandshakeCompletedListener;
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.NettyChannelInitializer;
import org.neo4j.driver.internal.bolt.basicimpl.async.connection.NettyDomainNameResolverGroup;
import org.neo4j.driver.internal.bolt.basicimpl.async.inbound.ConnectTimeoutHandler;
import org.neo4j.driver.internal.bolt.basicimpl.messaging.BoltProtocol;
import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection;

public final class NettyConnectionProvider implements ConnectionProvider {
Expand All @@ -69,7 +68,7 @@ public NettyConnectionProvider(
LocalAddress localAddress,
LoggingProvider logging) {
this.eventLoopGroup = eventLoopGroup;
this.clock = clock;
this.clock = requireNonNull(clock);
this.domainNameResolver = requireNonNull(domainNameResolver);
this.addressResolverGroup = new NettyDomainNameResolverGroup(this.domainNameResolver);
this.localAddress = localAddress;
Expand Down Expand Up @@ -111,42 +110,22 @@ public CompletionStage<Connection> acquireConnection(
socketAddress = localAddress;
}

var connectedFuture = bootstrap.connect(socketAddress);

var channel = connectedFuture.channel();
var handshakeCompleted = channel.newPromise();
var connectionInitialized = channel.newPromise();

installChannelConnectedListeners(address, connectedFuture, handshakeCompleted, connectTimeoutMillis);
installHandshakeCompletedListeners(
handshakeCompleted,
connectionInitialized,
address,
routingContext,
authMap,
boltAgent,
userAgent,
latestAuthMillisFuture,
notificationConfig);

var future = new CompletableFuture<Connection>();
connectionInitialized.addListener((ChannelFutureListener) f -> {
var throwable = f.cause();
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
var connection = new NetworkConnection(channel, logging);
future.complete(connection);
}
});
return future;
return installChannelConnectedListeners(address, bootstrap.connect(socketAddress), connectTimeoutMillis)
.thenCompose(channel -> BoltProtocol.forChannel(channel)
.initializeChannel(
channel,
requireNonNull(userAgent),
requireNonNull(boltAgent),
authMap,
routingContext,
notificationConfig,
clock,
latestAuthMillisFuture))
.thenApply(channel -> new NetworkConnection(channel, logging));
}

private void installChannelConnectedListeners(
BoltServerAddress address,
ChannelFuture channelConnected,
ChannelPromise handshakeCompleted,
int connectTimeoutMillis) {
private CompletionStage<Channel> installChannelConnectedListeners(
BoltServerAddress address, ChannelFuture channelConnected, int connectTimeoutMillis) {
var pipeline = channelConnected.channel().pipeline();

// add timeout handler to the pipeline when channel is connected. it's needed to
Expand All @@ -156,42 +135,16 @@ private void installChannelConnectedListeners(
channelConnected.addListener(future -> pipeline.addFirst(new ConnectTimeoutHandler(connectTimeoutMillis)));

// add listener that sends Bolt handshake bytes when channel is connected
var handshakeCompleted = new CompletableFuture<Channel>();
channelConnected.addListener(
new ChannelConnectedListener(address, new ChannelPipelineBuilderImpl(), handshakeCompleted, logging));
}

private void installHandshakeCompletedListeners(
ChannelPromise handshakeCompleted,
ChannelPromise connectionInitialized,
BoltServerAddress address,
RoutingContext routingContext,
Map<String, Value> authMap,
BoltAgent boltAgent,
String userAgent,
CompletableFuture<Long> latestAuthMillisFuture,
NotificationConfig notificationConfig) {
var pipeline = handshakeCompleted.channel().pipeline();

// remove timeout handler from the pipeline once TLS and Bolt handshakes are
// completed. regular protocol
// messages will flow next and we do not want to have read timeout for them
handshakeCompleted.addListener(future -> {
if (future.isSuccess()) {
pipeline.remove(ConnectTimeoutHandler.class);
return handshakeCompleted.whenComplete((channel, throwable) -> {
if (throwable == null) {
// remove timeout handler from the pipeline once TLS and Bolt handshakes are
// completed. regular protocol
// messages will flow next and we do not want to have read timeout for them
channel.pipeline().remove(ConnectTimeoutHandler.class);
}
});

// add listener that sends an INIT message. connection is now fully established.
// channel pipeline is fully
// set to send/receive messages for a selected protocol version
handshakeCompleted.addListener(new HandshakeCompletedListener(
authMap,
userAgent,
boltAgent,
routingContext,
connectionInitialized,
notificationConfig,
this.clock,
latestAuthMillisFuture));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import static java.lang.String.format;
import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.BoltProtocolUtil.handshakeString;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLHandshakeException;
import org.neo4j.driver.exceptions.SecurityException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
Expand All @@ -32,30 +33,29 @@
public class ChannelConnectedListener implements ChannelFutureListener {
private final BoltServerAddress address;
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final CompletableFuture<Channel> handshakeCompletedFuture;
private final LoggingProvider logging;

public ChannelConnectedListener(
BoltServerAddress address,
ChannelPipelineBuilder pipelineBuilder,
ChannelPromise handshakeCompletedPromise,
CompletableFuture<Channel> handshakeCompletedFuture,
LoggingProvider logging) {
this.address = address;
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.handshakeCompletedFuture = handshakeCompletedFuture;
this.logging = logging;
}

@Override
public void operationComplete(ChannelFuture future) {
var channel = future.channel();
var log = new ChannelActivityLogger(channel, logging, getClass());

if (future.isSuccess()) {
var channel = future.channel();
var log = new ChannelActivityLogger(channel, logging, getClass());
log.log(System.Logger.Level.TRACE, "Channel %s connected, initiating bolt handshake", channel);

var pipeline = channel.pipeline();
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, logging));
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedFuture, logging));
log.log(System.Logger.Level.DEBUG, "C: [Bolt Handshake] %s", handshakeString());
channel.writeAndFlush(BoltProtocolUtil.handshakeBuf()).addListener(f -> {
if (!f.isSuccess()) {
Expand All @@ -66,11 +66,11 @@ public void operationComplete(ChannelFuture future) {
error = new ServiceUnavailableException(
String.format("Unable to write Bolt handshake to %s.", this.address), error);
}
this.handshakeCompletedPromise.setFailure(error);
this.handshakeCompletedFuture.completeExceptionally(error);
}
});
} else {
handshakeCompletedPromise.setFailure(databaseUnavailableError(address, future.cause()));
handshakeCompletedFuture.completeExceptionally(databaseUnavailableError(address, future.cause()));
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import static org.neo4j.driver.internal.bolt.api.BoltProtocolVersion.isHttp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLHandshakeException;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.SecurityException;
Expand All @@ -39,17 +40,19 @@

public class HandshakeHandler extends ReplayingDecoder<Void> {
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final CompletableFuture<Channel> handshakeCompletedFuture;
private final LoggingProvider logging;

private boolean failed;
private ChannelActivityLogger log;
private ChannelErrorLogger errorLog;

public HandshakeHandler(
ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise, LoggingProvider logging) {
ChannelPipelineBuilder pipelineBuilder,
CompletableFuture<Channel> handshakeCompletedFuture,
LoggingProvider logging) {
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.handshakeCompletedFuture = handshakeCompletedFuture;
this.logging = logging;
}

Expand Down Expand Up @@ -114,7 +117,7 @@ private BoltProtocol protocolForVersion(BoltProtocolVersion version) {
private void protocolSelected(BoltProtocolVersion version, MessageFormat messageFormat, ChannelHandlerContext ctx) {
ChannelAttributes.setProtocolVersion(ctx.channel(), version);
pipelineBuilder.build(messageFormat, ctx.pipeline(), logging);
handshakeCompletedPromise.setSuccess();
handshakeCompletedFuture.complete(ctx.channel());
}

private void handleUnknownSuggestedProtocolVersion(BoltProtocolVersion version, ChannelHandlerContext ctx) {
Expand All @@ -128,7 +131,7 @@ private void handleUnknownSuggestedProtocolVersion(BoltProtocolVersion version,
}

private void fail(ChannelHandlerContext ctx, Throwable error) {
ctx.close().addListener(future -> handshakeCompletedPromise.tryFailure(error));
ctx.close().addListener(future -> handshakeCompletedFuture.completeExceptionally(error));
}

private static Throwable protocolNoSupportedByServerError() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.protocolVersion;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import java.time.Clock;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -59,12 +58,12 @@
public interface BoltProtocol {
MessageFormat createMessageFormat();

void initializeChannel(
CompletionStage<Channel> initializeChannel(
Channel channel,
String userAgent,
BoltAgent boltAgent,
Map<String, Value> authMap,
RoutingContext routingContext,
ChannelPromise channelInitializedPromise,
NotificationConfig notificationConfig,
Clock clock,
CompletableFuture<Long> latestAuthMillisFuture);
Expand Down
Loading