Skip to content

Commit d56af61

Browse files
authored
Fix Bolt handshake write handling and timeout management (#1528) (#1546)
1 parent 2a20e31 commit d56af61

File tree

6 files changed

+50
-7
lines changed

6 files changed

+50
-7
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListener.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@
1919
package org.neo4j.driver.internal.async.connection;
2020

2121
import static java.lang.String.format;
22-
import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.handshakeBuf;
2322
import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.handshakeString;
2423

2524
import io.netty.channel.Channel;
2625
import io.netty.channel.ChannelFuture;
2726
import io.netty.channel.ChannelFutureListener;
2827
import io.netty.channel.ChannelPipeline;
2928
import io.netty.channel.ChannelPromise;
29+
import javax.net.ssl.SSLHandshakeException;
3030
import org.neo4j.driver.Logger;
3131
import org.neo4j.driver.Logging;
32+
import org.neo4j.driver.exceptions.SecurityException;
3233
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3334
import org.neo4j.driver.internal.BoltServerAddress;
3435
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
@@ -61,7 +62,18 @@ public void operationComplete(ChannelFuture future) {
6162
ChannelPipeline pipeline = channel.pipeline();
6263
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, logging));
6364
log.debug("C: [Bolt Handshake] %s", handshakeString());
64-
channel.writeAndFlush(handshakeBuf(), channel.voidPromise());
65+
channel.writeAndFlush(BoltProtocolUtil.handshakeBuf()).addListener(f -> {
66+
if (!f.isSuccess()) {
67+
Throwable error = f.cause();
68+
if (error instanceof SSLHandshakeException) {
69+
error = new SecurityException("Failed to establish secured connection with the server", error);
70+
} else {
71+
error = new ServiceUnavailableException(
72+
String.format("Unable to write Bolt handshake to %s.", this.address), error);
73+
}
74+
this.handshakeCompletedPromise.setFailure(error);
75+
}
76+
});
6577
} else {
6678
handshakeCompletedPromise.setFailure(databaseUnavailableError(address, future.cause()));
6779
}

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ private void installHandshakeCompletedListeners(
136136

137137
// remove timeout handler from the pipeline once TLS and Bolt handshakes are completed. regular protocol
138138
// messages will flow next and we do not want to have read timeout for them
139-
handshakeCompleted.addListener(future -> pipeline.remove(ConnectTimeoutHandler.class));
139+
handshakeCompleted.addListener(future -> {
140+
if (future.isSuccess()) {
141+
pipeline.remove(ConnectTimeoutHandler.class);
142+
}
143+
});
140144

141145
// add listener that sends an INIT message. connection is now fully established. channel pipeline if fully
142146
// set to send/receive messages for a selected protocol version

driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Arrays;
4141
import java.util.Iterator;
4242
import java.util.List;
43+
import org.junit.jupiter.api.Disabled;
4344
import org.junit.jupiter.api.Test;
4445
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4546
import org.neo4j.driver.internal.BoltServerAddress;
@@ -152,6 +153,7 @@ void shouldFailToCreateUnencryptedDriverWhenServerDoesNotRespond() throws IOExce
152153
}
153154

154155
@Test
156+
@Disabled("TLS actually fails, the test setup is not valid")
155157
void shouldFailToCreateEncryptedDriverWhenServerDoesNotRespond() throws IOException {
156158
testFailureWhenServerDoesNotRespond(true);
157159
}

driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.TimeUnit;
4646
import org.junit.jupiter.api.AfterEach;
4747
import org.junit.jupiter.api.BeforeEach;
48+
import org.junit.jupiter.api.Disabled;
4849
import org.junit.jupiter.api.Test;
4950
import org.junit.jupiter.api.extension.RegisterExtension;
5051
import org.neo4j.driver.AuthToken;
@@ -158,6 +159,7 @@ void shouldFailWhenProtocolNegotiationTakesTooLong() throws Exception {
158159
}
159160

160161
@Test
162+
@Disabled("TLS actually fails, the test setup is not valid")
161163
void shouldFailWhenTLSHandshakeTakesTooLong() throws Exception {
162164
// run with TLS so that TLS handshake is the very first operation after connection is established
163165
testReadTimeoutOnConnect(trustAllCertificates());

driver/src/test/java/org/neo4j/driver/integration/EncryptionIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void shouldOperateWithEncryptionWhenItIsOptionalInTheDatabase() {
5959

6060
@Test
6161
void shouldFailWithoutEncryptionWhenItIsRequiredInTheDatabase() {
62-
testMismatchingEncryption(BoltTlsLevel.REQUIRED, false);
62+
testMismatchingEncryption(BoltTlsLevel.REQUIRED, false, "Connection to the database terminated");
6363
}
6464

6565
@Test
@@ -74,7 +74,7 @@ void shouldOperateWithEncryptionWhenConfiguredUsingBoltSscURI() {
7474

7575
@Test
7676
void shouldFailWithEncryptionWhenItIsDisabledInTheDatabase() {
77-
testMismatchingEncryption(BoltTlsLevel.DISABLED, true);
77+
testMismatchingEncryption(BoltTlsLevel.DISABLED, true, "Unable to write Bolt handshake to");
7878
}
7979

8080
@Test
@@ -110,7 +110,7 @@ private void testMatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncrypt
110110
}
111111
}
112112

113-
private void testMismatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncrypted) {
113+
private void testMismatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncrypted, String errorMessage) {
114114
Map<String, String> tlsConfig = new HashMap<>();
115115
tlsConfig.put(Neo4jSettings.BOLT_TLS_LEVEL, tlsLevel.toString());
116116
neo4j.deleteAndStartNeo4j(tlsConfig);
@@ -120,7 +120,7 @@ private void testMismatchingEncryption(BoltTlsLevel tlsLevel, boolean driverEncr
120120
ServiceUnavailableException.class, () -> GraphDatabase.driver(neo4j.uri(), neo4j.authToken(), config)
121121
.verifyConnectivity());
122122

123-
assertThat(e.getMessage(), startsWith("Connection to the database terminated"));
123+
assertThat(e.getMessage(), startsWith(errorMessage));
124124
}
125125

126126
private static Config newConfig(boolean withEncryption) {

driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListenerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.neo4j.driver.internal.async.connection;
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertFalse;
23+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2224
import static org.junit.jupiter.api.Assertions.assertNotNull;
2325
import static org.junit.jupiter.api.Assertions.assertThrows;
2426
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,7 +31,9 @@
2931

3032
import io.netty.channel.ChannelPromise;
3133
import io.netty.channel.embedded.EmbeddedChannel;
34+
import io.netty.util.concurrent.Future;
3235
import java.io.IOException;
36+
import java.util.concurrent.CompletableFuture;
3337
import org.junit.jupiter.api.AfterEach;
3438
import org.junit.jupiter.api.Test;
3539
import org.neo4j.driver.exceptions.ServiceUnavailableException;
@@ -73,6 +77,25 @@ void shouldWriteHandshakeWhenChannelConnected() {
7377
assertEquals(handshakeBuf(), channel.readOutbound());
7478
}
7579

80+
@Test
81+
void shouldCompleteHandshakePromiseExceptionallyOnWriteFailure() {
82+
ChannelPromise handshakeCompletedPromise = channel.newPromise();
83+
ChannelConnectedListener listener = newListener(handshakeCompletedPromise);
84+
ChannelPromise channelConnectedPromise = channel.newPromise();
85+
channelConnectedPromise.setSuccess();
86+
channel.close();
87+
88+
listener.operationComplete(channelConnectedPromise);
89+
90+
assertTrue(handshakeCompletedPromise.isDone());
91+
CompletableFuture<Future<?>> future = new CompletableFuture<>();
92+
handshakeCompletedPromise.addListener(future::complete);
93+
Future<?> handshakeFuture = future.join();
94+
assertTrue(handshakeFuture.isDone());
95+
assertFalse(handshakeFuture.isSuccess());
96+
assertInstanceOf(ServiceUnavailableException.class, handshakeFuture.cause());
97+
}
98+
7699
private static ChannelConnectedListener newListener(ChannelPromise handshakeCompletedPromise) {
77100
return new ChannelConnectedListener(
78101
LOCAL_DEFAULT, new ChannelPipelineBuilderImpl(), handshakeCompletedPromise, DEV_NULL_LOGGING);

0 commit comments

Comments
 (0)