Skip to content

Commit bda4df7

Browse files
committed
Fix Bolt handshake write handling and timeout management (neo4j#1528)
1 parent 84f62af commit bda4df7

File tree

5 files changed

+48
-7
lines changed

5 files changed

+48
-7
lines changed

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,11 @@ private void installHandshakeCompletedListeners(
166166
// remove timeout handler from the pipeline once TLS and Bolt handshakes are
167167
// completed. regular protocol
168168
// messages will flow next and we do not want to have read timeout for them
169-
handshakeCompleted.addListener(future -> pipeline.remove(ConnectTimeoutHandler.class));
169+
handshakeCompleted.addListener(future -> {
170+
if (future.isSuccess()) {
171+
pipeline.remove(ConnectTimeoutHandler.class);
172+
}
173+
});
170174

171175
// add listener that sends an INIT message. connection is now fully established.
172176
// channel pipeline is fully

driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListener.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
package org.neo4j.driver.internal.bolt.basicimpl.async.connection;
1818

1919
import static java.lang.String.format;
20-
import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.BoltProtocolUtil.handshakeBuf;
2120
import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.BoltProtocolUtil.handshakeString;
2221

2322
import io.netty.channel.ChannelFuture;
2423
import io.netty.channel.ChannelFutureListener;
2524
import io.netty.channel.ChannelPromise;
25+
import javax.net.ssl.SSLHandshakeException;
26+
import org.neo4j.driver.exceptions.SecurityException;
2627
import org.neo4j.driver.exceptions.ServiceUnavailableException;
2728
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
2829
import org.neo4j.driver.internal.bolt.api.LoggingProvider;
@@ -56,7 +57,18 @@ public void operationComplete(ChannelFuture future) {
5657
var pipeline = channel.pipeline();
5758
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, logging));
5859
log.log(System.Logger.Level.DEBUG, "C: [Bolt Handshake] %s", handshakeString());
59-
channel.writeAndFlush(handshakeBuf(), channel.voidPromise());
60+
channel.writeAndFlush(BoltProtocolUtil.handshakeBuf()).addListener(f -> {
61+
if (!f.isSuccess()) {
62+
var error = f.cause();
63+
if (error instanceof SSLHandshakeException) {
64+
error = new SecurityException("Failed to establish secured connection with the server", error);
65+
} else {
66+
error = new ServiceUnavailableException(
67+
String.format("Unable to write Bolt handshake to %s.", this.address), error);
68+
}
69+
this.handshakeCompletedPromise.setFailure(error);
70+
}
71+
});
6072
} else {
6173
handshakeCompletedPromise.setFailure(databaseUnavailableError(address, future.cause()));
6274
}

driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.net.ServerSocket;
3131
import java.net.URI;
32+
import org.junit.jupiter.api.Disabled;
3233
import org.junit.jupiter.api.Test;
3334
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3435
import org.neo4j.driver.internal.security.StaticAuthTokenManager;
@@ -95,6 +96,7 @@ void shouldFailToCreateUnencryptedDriverWhenServerDoesNotRespond() throws IOExce
9596
}
9697

9798
@Test
99+
@Disabled("TLS actually fails, the test setup is not valid")
98100
void shouldFailToCreateEncryptedDriverWhenServerDoesNotRespond() throws IOException {
99101
testFailureWhenServerDoesNotRespond(true);
100102
}

driver/src/test/java/org/neo4j/driver/integration/EncryptionIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void shouldOperateWithEncryptionWhenItIsOptionalInTheDatabase() {
5353

5454
@Test
5555
void shouldFailWithoutEncryptionWhenItIsRequiredInTheDatabase() {
56-
testMismatchingEncryption(BoltTlsLevel.REQUIRED, false);
56+
testMismatchingEncryption(BoltTlsLevel.REQUIRED, false, "Connection to the database terminated");
5757
}
5858

5959
@Test
@@ -68,7 +68,7 @@ void shouldOperateWithEncryptionWhenConfiguredUsingBoltSscURI() {
6868

6969
@Test
7070
void shouldFailWithEncryptionWhenItIsDisabledInTheDatabase() {
71-
testMismatchingEncryption(BoltTlsLevel.DISABLED, true);
71+
testMismatchingEncryption(BoltTlsLevel.DISABLED, true, "Unable to write Bolt handshake to");
7272
}
7373

7474
@Test
@@ -104,7 +104,7 @@ var record = result.next();
104104
}
105105
}
106106

107-
private void testMismatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncrypted) {
107+
private void testMismatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncrypted, String errorMessage) {
108108
Map<String, String> tlsConfig = new HashMap<>();
109109
tlsConfig.put(Neo4jSettings.BOLT_TLS_LEVEL, tlsLevel.toString());
110110
neo4j.deleteAndStartNeo4j(tlsConfig);
@@ -115,7 +115,7 @@ private void testMismatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncr
115115
neo4j.uri(), neo4j.authTokenManager(), config)
116116
.verifyConnectivity());
117117

118-
assertThat(e.getMessage(), startsWith("Connection to the database terminated"));
118+
assertThat(e.getMessage(), startsWith(errorMessage));
119119
}
120120

121121
private static Config newConfig(boolean withEncryption) {

driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListenerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.neo4j.driver.internal.bolt.basicimpl.async.connection;
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
21+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2022
import static org.junit.jupiter.api.Assertions.assertNotNull;
2123
import static org.junit.jupiter.api.Assertions.assertThrows;
2224
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -26,7 +28,9 @@
2628

2729
import io.netty.channel.ChannelPromise;
2830
import io.netty.channel.embedded.EmbeddedChannel;
31+
import io.netty.util.concurrent.Future;
2932
import java.io.IOException;
33+
import java.util.concurrent.CompletableFuture;
3034
import org.junit.jupiter.api.AfterEach;
3135
import org.junit.jupiter.api.Test;
3236
import org.neo4j.driver.exceptions.ServiceUnavailableException;
@@ -70,6 +74,25 @@ void shouldWriteHandshakeWhenChannelConnected() {
7074
assertEquals(handshakeBuf(), channel.readOutbound());
7175
}
7276

77+
@Test
78+
void shouldCompleteHandshakePromiseExceptionallyOnWriteFailure() {
79+
var handshakeCompletedPromise = channel.newPromise();
80+
var listener = newListener(handshakeCompletedPromise);
81+
var channelConnectedPromise = channel.newPromise();
82+
channelConnectedPromise.setSuccess();
83+
channel.close();
84+
85+
listener.operationComplete(channelConnectedPromise);
86+
87+
assertTrue(handshakeCompletedPromise.isDone());
88+
var future = new CompletableFuture<Future<?>>();
89+
handshakeCompletedPromise.addListener(future::complete);
90+
var handshakeFuture = future.join();
91+
assertTrue(handshakeFuture.isDone());
92+
assertFalse(handshakeFuture.isSuccess());
93+
assertInstanceOf(ServiceUnavailableException.class, handshakeFuture.cause());
94+
}
95+
7396
private static ChannelConnectedListener newListener(ChannelPromise handshakeCompletedPromise) {
7497
return new ChannelConnectedListener(
7598
LOCAL_DEFAULT,

0 commit comments

Comments
 (0)