diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java index ed557bc974..d78bb52444 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java @@ -47,6 +47,7 @@ import org.neo4j.driver.internal.bolt.api.exception.MinVersionAcquisitionException; import org.neo4j.driver.internal.bolt.basicimpl.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.bolt.basicimpl.messaging.v51.BoltProtocolV51; +import org.neo4j.driver.internal.bolt.basicimpl.util.FutureUtil; public final class NettyBoltConnectionProvider implements BoltConnectionProvider { private final LoggingProvider logging; @@ -149,6 +150,7 @@ public CompletionStage connect( }) .handle((connection, throwable) -> { if (throwable != null) { + throwable = FutureUtil.completionExceptionCause(throwable); log.log(System.Logger.Level.DEBUG, "Failed to establish BoltConnection " + address, throwable); throw new CompletionException(throwable); } else { diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java index 8563760fe2..e5fcf7ae3a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyConnectionProvider.java @@ -19,10 +19,9 @@ import static java.util.Objects.requireNonNull; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; @@ -47,10 +46,10 @@ import org.neo4j.driver.internal.bolt.basicimpl.async.NetworkConnection; import org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelConnectedListener; import org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelPipelineBuilderImpl; -import org.neo4j.driver.internal.bolt.basicimpl.async.connection.HandshakeCompletedListener; import org.neo4j.driver.internal.bolt.basicimpl.async.connection.NettyChannelInitializer; import org.neo4j.driver.internal.bolt.basicimpl.async.connection.NettyDomainNameResolverGroup; import org.neo4j.driver.internal.bolt.basicimpl.async.inbound.ConnectTimeoutHandler; +import org.neo4j.driver.internal.bolt.basicimpl.messaging.BoltProtocol; import org.neo4j.driver.internal.bolt.basicimpl.spi.Connection; public final class NettyConnectionProvider implements ConnectionProvider { @@ -69,7 +68,7 @@ public NettyConnectionProvider( LocalAddress localAddress, LoggingProvider logging) { this.eventLoopGroup = eventLoopGroup; - this.clock = clock; + this.clock = requireNonNull(clock); this.domainNameResolver = requireNonNull(domainNameResolver); this.addressResolverGroup = new NettyDomainNameResolverGroup(this.domainNameResolver); this.localAddress = localAddress; @@ -111,42 +110,22 @@ public CompletionStage acquireConnection( socketAddress = localAddress; } - var connectedFuture = bootstrap.connect(socketAddress); - - var channel = connectedFuture.channel(); - var handshakeCompleted = channel.newPromise(); - var connectionInitialized = channel.newPromise(); - - installChannelConnectedListeners(address, connectedFuture, handshakeCompleted, connectTimeoutMillis); - installHandshakeCompletedListeners( - handshakeCompleted, - connectionInitialized, - address, - routingContext, - authMap, - boltAgent, - userAgent, - latestAuthMillisFuture, - notificationConfig); - - var future = new CompletableFuture(); - connectionInitialized.addListener((ChannelFutureListener) f -> { - var throwable = f.cause(); - if (throwable != null) { - future.completeExceptionally(throwable); - } else { - var connection = new NetworkConnection(channel, logging); - future.complete(connection); - } - }); - return future; + return installChannelConnectedListeners(address, bootstrap.connect(socketAddress), connectTimeoutMillis) + .thenCompose(channel -> BoltProtocol.forChannel(channel) + .initializeChannel( + channel, + requireNonNull(userAgent), + requireNonNull(boltAgent), + authMap, + routingContext, + notificationConfig, + clock, + latestAuthMillisFuture)) + .thenApply(channel -> new NetworkConnection(channel, logging)); } - private void installChannelConnectedListeners( - BoltServerAddress address, - ChannelFuture channelConnected, - ChannelPromise handshakeCompleted, - int connectTimeoutMillis) { + private CompletionStage installChannelConnectedListeners( + BoltServerAddress address, ChannelFuture channelConnected, int connectTimeoutMillis) { var pipeline = channelConnected.channel().pipeline(); // add timeout handler to the pipeline when channel is connected. it's needed to @@ -156,42 +135,16 @@ private void installChannelConnectedListeners( channelConnected.addListener(future -> pipeline.addFirst(new ConnectTimeoutHandler(connectTimeoutMillis))); // add listener that sends Bolt handshake bytes when channel is connected + var handshakeCompleted = new CompletableFuture(); channelConnected.addListener( new ChannelConnectedListener(address, new ChannelPipelineBuilderImpl(), handshakeCompleted, logging)); - } - - private void installHandshakeCompletedListeners( - ChannelPromise handshakeCompleted, - ChannelPromise connectionInitialized, - BoltServerAddress address, - RoutingContext routingContext, - Map authMap, - BoltAgent boltAgent, - String userAgent, - CompletableFuture latestAuthMillisFuture, - NotificationConfig notificationConfig) { - var pipeline = handshakeCompleted.channel().pipeline(); - - // remove timeout handler from the pipeline once TLS and Bolt handshakes are - // completed. regular protocol - // messages will flow next and we do not want to have read timeout for them - handshakeCompleted.addListener(future -> { - if (future.isSuccess()) { - pipeline.remove(ConnectTimeoutHandler.class); + return handshakeCompleted.whenComplete((channel, throwable) -> { + if (throwable == null) { + // remove timeout handler from the pipeline once TLS and Bolt handshakes are + // completed. regular protocol + // messages will flow next and we do not want to have read timeout for them + channel.pipeline().remove(ConnectTimeoutHandler.class); } }); - - // add listener that sends an INIT message. connection is now fully established. - // channel pipeline is fully - // set to send/receive messages for a selected protocol version - handshakeCompleted.addListener(new HandshakeCompletedListener( - authMap, - userAgent, - boltAgent, - routingContext, - connectionInitialized, - notificationConfig, - this.clock, - latestAuthMillisFuture)); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListener.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListener.java index a74cb070c9..7b5b8e6607 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListener.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListener.java @@ -19,9 +19,10 @@ import static java.lang.String.format; import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.BoltProtocolUtil.handshakeString; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPromise; +import java.util.concurrent.CompletableFuture; import javax.net.ssl.SSLHandshakeException; import org.neo4j.driver.exceptions.SecurityException; import org.neo4j.driver.exceptions.ServiceUnavailableException; @@ -32,30 +33,29 @@ public class ChannelConnectedListener implements ChannelFutureListener { private final BoltServerAddress address; private final ChannelPipelineBuilder pipelineBuilder; - private final ChannelPromise handshakeCompletedPromise; + private final CompletableFuture handshakeCompletedFuture; private final LoggingProvider logging; public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder, - ChannelPromise handshakeCompletedPromise, + CompletableFuture handshakeCompletedFuture, LoggingProvider logging) { this.address = address; this.pipelineBuilder = pipelineBuilder; - this.handshakeCompletedPromise = handshakeCompletedPromise; + this.handshakeCompletedFuture = handshakeCompletedFuture; this.logging = logging; } @Override public void operationComplete(ChannelFuture future) { - var channel = future.channel(); - var log = new ChannelActivityLogger(channel, logging, getClass()); - if (future.isSuccess()) { + var channel = future.channel(); + var log = new ChannelActivityLogger(channel, logging, getClass()); log.log(System.Logger.Level.TRACE, "Channel %s connected, initiating bolt handshake", channel); var pipeline = channel.pipeline(); - pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, logging)); + pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedFuture, logging)); log.log(System.Logger.Level.DEBUG, "C: [Bolt Handshake] %s", handshakeString()); channel.writeAndFlush(BoltProtocolUtil.handshakeBuf()).addListener(f -> { if (!f.isSuccess()) { @@ -66,11 +66,11 @@ public void operationComplete(ChannelFuture future) { error = new ServiceUnavailableException( String.format("Unable to write Bolt handshake to %s.", this.address), error); } - this.handshakeCompletedPromise.setFailure(error); + this.handshakeCompletedFuture.completeExceptionally(error); } }); } else { - handshakeCompletedPromise.setFailure(databaseUnavailableError(address, future.cause())); + handshakeCompletedFuture.completeExceptionally(databaseUnavailableError(address, future.cause())); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListener.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListener.java deleted file mode 100644 index 460ae3fd56..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListener.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.bolt.basicimpl.async.connection; - -import static java.util.Objects.requireNonNull; - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPromise; -import java.time.Clock; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.neo4j.driver.Value; -import org.neo4j.driver.internal.bolt.api.BoltAgent; -import org.neo4j.driver.internal.bolt.api.NotificationConfig; -import org.neo4j.driver.internal.bolt.api.RoutingContext; -import org.neo4j.driver.internal.bolt.basicimpl.messaging.BoltProtocol; - -public class HandshakeCompletedListener implements ChannelFutureListener { - private final Map authMap; - private final String userAgent; - private final BoltAgent boltAgent; - private final RoutingContext routingContext; - private final ChannelPromise connectionInitializedPromise; - private final NotificationConfig notificationConfig; - private final Clock clock; - private final CompletableFuture latestAuthMillisFuture; - - public HandshakeCompletedListener( - Map authMap, - String userAgent, - BoltAgent boltAgent, - RoutingContext routingContext, - ChannelPromise connectionInitializedPromise, - NotificationConfig notificationConfig, - Clock clock, - CompletableFuture latestAuthMillisFuture) { - requireNonNull(clock, "clock must not be null"); - this.authMap = authMap; - this.userAgent = requireNonNull(userAgent); - this.boltAgent = requireNonNull(boltAgent); - this.routingContext = routingContext; - this.connectionInitializedPromise = requireNonNull(connectionInitializedPromise); - this.notificationConfig = notificationConfig; - this.clock = clock; - this.latestAuthMillisFuture = latestAuthMillisFuture; - } - - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - var protocol = BoltProtocol.forChannel(future.channel()); - protocol.initializeChannel( - userAgent, - boltAgent, - authMap, - routingContext, - connectionInitializedPromise, - notificationConfig, - clock, - latestAuthMillisFuture); - } else { - connectionInitializedPromise.setFailure(future.cause()); - } - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandler.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandler.java index e0873a3731..12896667d5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandler.java @@ -19,11 +19,12 @@ import static org.neo4j.driver.internal.bolt.api.BoltProtocolVersion.isHttp; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; +import java.util.concurrent.CompletableFuture; import javax.net.ssl.SSLHandshakeException; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.SecurityException; @@ -39,7 +40,7 @@ public class HandshakeHandler extends ReplayingDecoder { private final ChannelPipelineBuilder pipelineBuilder; - private final ChannelPromise handshakeCompletedPromise; + private final CompletableFuture handshakeCompletedFuture; private final LoggingProvider logging; private boolean failed; @@ -47,9 +48,11 @@ public class HandshakeHandler extends ReplayingDecoder { private ChannelErrorLogger errorLog; public HandshakeHandler( - ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise, LoggingProvider logging) { + ChannelPipelineBuilder pipelineBuilder, + CompletableFuture handshakeCompletedFuture, + LoggingProvider logging) { this.pipelineBuilder = pipelineBuilder; - this.handshakeCompletedPromise = handshakeCompletedPromise; + this.handshakeCompletedFuture = handshakeCompletedFuture; this.logging = logging; } @@ -114,7 +117,7 @@ private BoltProtocol protocolForVersion(BoltProtocolVersion version) { private void protocolSelected(BoltProtocolVersion version, MessageFormat messageFormat, ChannelHandlerContext ctx) { ChannelAttributes.setProtocolVersion(ctx.channel(), version); pipelineBuilder.build(messageFormat, ctx.pipeline(), logging); - handshakeCompletedPromise.setSuccess(); + handshakeCompletedFuture.complete(ctx.channel()); } private void handleUnknownSuggestedProtocolVersion(BoltProtocolVersion version, ChannelHandlerContext ctx) { @@ -128,7 +131,7 @@ private void handleUnknownSuggestedProtocolVersion(BoltProtocolVersion version, } private void fail(ChannelHandlerContext ctx, Throwable error) { - ctx.close().addListener(future -> handshakeCompletedPromise.tryFailure(error)); + ctx.close().addListener(future -> handshakeCompletedFuture.completeExceptionally(error)); } private static Throwable protocolNoSupportedByServerError() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/BoltProtocol.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/BoltProtocol.java index cf00851d09..1459efedaf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/BoltProtocol.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/BoltProtocol.java @@ -19,7 +19,6 @@ import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.protocolVersion; import io.netty.channel.Channel; -import io.netty.channel.ChannelPromise; import java.time.Clock; import java.time.Duration; import java.util.Map; @@ -59,12 +58,12 @@ public interface BoltProtocol { MessageFormat createMessageFormat(); - void initializeChannel( + CompletionStage initializeChannel( + Channel channel, String userAgent, BoltAgent boltAgent, Map authMap, RoutingContext routingContext, - ChannelPromise channelInitializedPromise, NotificationConfig notificationConfig, Clock clock, CompletableFuture latestAuthMillisFuture); diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3.java index 8a27688073..9ba067ea92 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3.java @@ -21,7 +21,7 @@ import static org.neo4j.driver.internal.bolt.basicimpl.messaging.request.CommitMessage.COMMIT; import static org.neo4j.driver.internal.bolt.basicimpl.messaging.request.RollbackMessage.ROLLBACK; -import io.netty.channel.ChannelPromise; +import io.netty.channel.Channel; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; @@ -91,21 +91,19 @@ public MessageFormat createMessageFormat() { } @Override - public void initializeChannel( + public CompletionStage initializeChannel( + Channel channel, String userAgent, BoltAgent boltAgent, Map authMap, RoutingContext routingContext, - ChannelPromise channelInitializedPromise, NotificationConfig notificationConfig, Clock clock, CompletableFuture latestAuthMillisFuture) { var exception = verifyNotificationConfigSupported(notificationConfig); if (exception != null) { - channelInitializedPromise.setFailure(exception); - return; + return CompletableFuture.failedStage(exception); } - var channel = channelInitializedPromise.channel(); HelloMessage message; if (routingContext.isServerRoutingEnabled()) { @@ -132,13 +130,7 @@ public void initializeChannel( var handler = new HelloResponseHandler(future, channel, clock, latestAuthMillisFuture); messageDispatcher(channel).enqueue(handler); channel.writeAndFlush(message, channel.voidPromise()); - future.whenComplete((serverAgent, throwable) -> { - if (throwable != null) { - channelInitializedPromise.setFailure(throwable); - } else { - channelInitializedPromise.setSuccess(); - } - }); + return future.thenApply(ignored -> channel); } @SuppressWarnings("DuplicatedCode") diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51.java index bda56cb24f..1c83aa17f9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51.java @@ -18,7 +18,7 @@ import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.messageDispatcher; -import io.netty.channel.ChannelPromise; +import io.netty.channel.Channel; import java.time.Clock; import java.util.Collections; import java.util.Map; @@ -47,21 +47,19 @@ public class BoltProtocolV51 extends BoltProtocolV5 { @SuppressWarnings("DuplicatedCode") @Override - public void initializeChannel( + public CompletionStage initializeChannel( + Channel channel, String userAgent, BoltAgent boltAgent, Map authMap, RoutingContext routingContext, - ChannelPromise channelInitializedPromise, NotificationConfig notificationConfig, Clock clock, CompletableFuture latestAuthMillisFuture) { var exception = verifyNotificationConfigSupported(notificationConfig); if (exception != null) { - channelInitializedPromise.setFailure(exception); - return; + return CompletableFuture.failedStage(exception); } - var channel = channelInitializedPromise.channel(); HelloMessage message; if (routingContext.isServerRoutingEnabled()) { @@ -88,13 +86,7 @@ public void initializeChannel( .enqueue(new LogonResponseHandler(logonFuture, channel, clock, latestAuthMillisFuture)); channel.writeAndFlush(logon, channel.voidPromise()); - helloFuture.thenCompose(ignored -> logonFuture).whenComplete((ignored, throwable) -> { - if (throwable != null) { - channelInitializedPromise.setFailure(throwable); - } else { - channelInitializedPromise.setSuccess(); - } - }); + return helloFuture.thenCompose(ignored -> logonFuture).thenApply(ignored -> channel); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53.java b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53.java index aa61fd87a2..a95b53fe69 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53.java +++ b/driver/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53.java @@ -18,11 +18,12 @@ import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.messageDispatcher; -import io.netty.channel.ChannelPromise; +import io.netty.channel.Channel; import java.time.Clock; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Value; import org.neo4j.driver.internal.bolt.api.BoltAgent; import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; @@ -41,21 +42,19 @@ public class BoltProtocolV53 extends BoltProtocolV52 { @SuppressWarnings("DuplicatedCode") @Override - public void initializeChannel( + public CompletionStage initializeChannel( + Channel channel, String userAgent, BoltAgent boltAgent, Map authMap, RoutingContext routingContext, - ChannelPromise channelInitializedPromise, NotificationConfig notificationConfig, Clock clock, CompletableFuture latestAuthMillisFuture) { var exception = verifyNotificationConfigSupported(notificationConfig); if (exception != null) { - channelInitializedPromise.setFailure(exception); - return; + return CompletableFuture.failedStage(exception); } - var channel = channelInitializedPromise.channel(); HelloMessage message; if (routingContext.isServerRoutingEnabled()) { @@ -88,13 +87,7 @@ public void initializeChannel( .enqueue(new LogonResponseHandler(logonFuture, channel, clock, latestAuthMillisFuture)); channel.writeAndFlush(logon, channel.voidPromise()); - helloFuture.thenCompose(ignored -> logonFuture).whenComplete((ignored, throwable) -> { - if (throwable != null) { - channelInitializedPromise.setFailure(throwable); - } else { - channelInitializedPromise.setSuccess(); - } - }); + return helloFuture.thenCompose(ignored -> logonFuture).thenApply(ignored -> channel); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListenerTest.java index d23468f95d..f820097e9d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/ChannelConnectedListenerTest.java @@ -17,20 +17,25 @@ package org.neo4j.driver.internal.bolt.basicimpl.async.connection; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; import static org.neo4j.driver.internal.bolt.api.BoltServerAddress.LOCAL_DEFAULT; import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.BoltProtocolUtil.handshakeBuf; import static org.neo4j.driver.testutil.TestUtil.await; +import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.neo4j.driver.exceptions.ServiceUnavailableException; @@ -46,8 +51,8 @@ void tearDown() { @Test void shouldFailPromiseWhenChannelConnectionFails() { - var handshakeCompletedPromise = channel.newPromise(); - var listener = newListener(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var listener = newListener(handshakeCompletedFuture); var channelConnectedPromise = channel.newPromise(); var cause = new IOException("Unable to connect!"); @@ -55,14 +60,14 @@ void shouldFailPromiseWhenChannelConnectionFails() { listener.operationComplete(channelConnectedPromise); - var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(cause, error.getCause()); } @Test void shouldWriteHandshakeWhenChannelConnected() { - var handshakeCompletedPromise = channel.newPromise(); - var listener = newListener(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var listener = newListener(handshakeCompletedFuture); var channelConnectedPromise = channel.newPromise(); channelConnectedPromise.setSuccess(); @@ -76,28 +81,199 @@ void shouldWriteHandshakeWhenChannelConnected() { @Test void shouldCompleteHandshakePromiseExceptionallyOnWriteFailure() { - var handshakeCompletedPromise = channel.newPromise(); - var listener = newListener(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var listener = newListener(handshakeCompletedFuture); var channelConnectedPromise = channel.newPromise(); channelConnectedPromise.setSuccess(); channel.close(); listener.operationComplete(channelConnectedPromise); - assertTrue(handshakeCompletedPromise.isDone()); - var future = new CompletableFuture>(); - handshakeCompletedPromise.addListener(future::complete); - var handshakeFuture = future.join(); - assertTrue(handshakeFuture.isDone()); - assertFalse(handshakeFuture.isSuccess()); - assertInstanceOf(ServiceUnavailableException.class, handshakeFuture.cause()); + assertTrue(handshakeCompletedFuture.isCompletedExceptionally()); + var exception = assertThrows(CompletionException.class, handshakeCompletedFuture::join); + assertInstanceOf(ServiceUnavailableException.class, exception.getCause()); } - private static ChannelConnectedListener newListener(ChannelPromise handshakeCompletedPromise) { + @Test + void shouldCompleteFutureExceptionallyOnFailedPromise() { + var future = new CompletableFuture(); + var listener = newListener(future); + var throwable = mock(Throwable.class); + + listener.operationComplete(new FailedPromise(throwable)); + + assertTrue(future.isCompletedExceptionally()); + Throwable exception = assertThrows(CompletionException.class, future::join); + assertInstanceOf(ServiceUnavailableException.class, exception.getCause()); + exception = exception.getCause(); + assertEquals(throwable, exception.getCause()); + } + + private static ChannelConnectedListener newListener(CompletableFuture handshakeCompletedFuture) { return new ChannelConnectedListener( LOCAL_DEFAULT, new ChannelPipelineBuilderImpl(), - handshakeCompletedPromise, + handshakeCompletedFuture, NoopLoggingProvider.INSTANCE); } + + private record FailedPromise(Throwable failure) implements ChannelPromise { + @Override + public Channel channel() { + return null; + } + + @Override + public ChannelPromise setSuccess(Void result) { + return null; + } + + @Override + public boolean trySuccess(Void result) { + return false; + } + + @Override + public ChannelPromise setSuccess() { + return null; + } + + @Override + public boolean trySuccess() { + return false; + } + + @Override + public ChannelPromise setFailure(Throwable cause) { + return null; + } + + @Override + public boolean tryFailure(Throwable cause) { + return false; + } + + @Override + public boolean setUncancellable() { + return false; + } + + @Override + public boolean isSuccess() { + return false; + } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public Throwable cause() { + return failure; + } + + @Override + public ChannelPromise addListener(GenericFutureListener> listener) { + return null; + } + + @Override + @SafeVarargs + public final ChannelPromise addListeners(GenericFutureListener>... listeners) { + return null; + } + + @Override + public ChannelPromise removeListener(GenericFutureListener> listener) { + return null; + } + + @Override + @SafeVarargs + public final ChannelPromise removeListeners( + GenericFutureListener>... listeners) { + return null; + } + + @Override + public ChannelPromise sync() { + return null; + } + + @Override + public ChannelPromise syncUninterruptibly() { + return null; + } + + @Override + public ChannelPromise await() { + return null; + } + + @Override + public ChannelPromise awaitUninterruptibly() { + return null; + } + + @Override + public boolean await(long timeout, TimeUnit unit) { + return false; + } + + @Override + public boolean await(long timeoutMillis) { + return false; + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + return false; + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + return false; + } + + @Override + public Void getNow() { + return null; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Void get() { + return null; + } + + @Override + public Void get(long timeout, @NotNull TimeUnit unit) { + return null; + } + + @Override + public boolean isVoid() { + return false; + } + + @Override + public ChannelPromise unvoid() { + return null; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListenerTest.java deleted file mode 100644 index 50c55bbab4..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeCompletedListenerTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [https://neo4j.com] - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.bolt.basicimpl.async.connection; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.setMessageDispatcher; -import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.setProtocolVersion; -import static org.neo4j.driver.testutil.TestUtil.await; - -import io.netty.channel.embedded.EmbeddedChannel; -import java.io.IOException; -import java.time.Clock; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.neo4j.driver.Value; -import org.neo4j.driver.Values; -import org.neo4j.driver.internal.bolt.api.BoltAgentUtil; -import org.neo4j.driver.internal.bolt.api.RoutingContext; -import org.neo4j.driver.internal.bolt.basicimpl.async.inbound.InboundMessageDispatcher; -import org.neo4j.driver.internal.bolt.basicimpl.handlers.HelloResponseHandler; -import org.neo4j.driver.internal.bolt.basicimpl.messaging.request.HelloMessage; -import org.neo4j.driver.internal.bolt.basicimpl.messaging.v3.BoltProtocolV3; -import org.neo4j.driver.internal.bolt.basicimpl.spi.ResponseHandler; - -class HandshakeCompletedListenerTest { - private static final String USER_AGENT = "user-agent"; - - private final EmbeddedChannel channel = new EmbeddedChannel(); - - @AfterEach - void tearDown() { - channel.finishAndReleaseAll(); - } - - @Test - void shouldFailConnectionInitializedPromiseWhenHandshakeFails() { - var channelInitializedPromise = channel.newPromise(); - var listener = new HandshakeCompletedListener( - Collections.emptyMap(), - USER_AGENT, - BoltAgentUtil.VALUE, - RoutingContext.EMPTY, - channelInitializedPromise, - null, - mock(Clock.class), - new CompletableFuture<>()); - - var handshakeCompletedPromise = channel.newPromise(); - var cause = new IOException("Bad handshake"); - handshakeCompletedPromise.setFailure(cause); - - listener.operationComplete(handshakeCompletedPromise); - - var error = assertThrows(Exception.class, () -> await(channelInitializedPromise)); - assertEquals(cause, error); - } - - @Test - void shouldWriteInitializationMessageInBoltV3WhenHandshakeCompleted() { - var expectedMessage = - new HelloMessage(USER_AGENT, null, authToken(), Collections.emptyMap(), false, null, false); - var messageDispatcher = mock(InboundMessageDispatcher.class); - setProtocolVersion(channel, BoltProtocolV3.VERSION); - setMessageDispatcher(channel, messageDispatcher); - - var channelInitializedPromise = channel.newPromise(); - var listener = new HandshakeCompletedListener( - authToken(), - USER_AGENT, - BoltAgentUtil.VALUE, - RoutingContext.EMPTY, - channelInitializedPromise, - null, - mock(Clock.class), - new CompletableFuture<>()); - - var handshakeCompletedPromise = channel.newPromise(); - handshakeCompletedPromise.setSuccess(); - - listener.operationComplete(handshakeCompletedPromise); - assertTrue(channel.finish()); - - verify(messageDispatcher).enqueue(any((Class) HelloResponseHandler.class)); - var outboundMessage = channel.readOutbound(); - assertEquals(expectedMessage, outboundMessage); - } - - private static Map authToken() { - return Map.of("neo4j", Values.value("secret")); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandlerTest.java index a52edf2b10..21505ea403 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/async/connection/HandshakeHandlerTest.java @@ -29,11 +29,12 @@ import static org.neo4j.driver.internal.bolt.basicimpl.async.connection.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.testutil.TestUtil.await; +import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.DecoderException; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import javax.net.ssl.SSLHandshakeException; import org.junit.jupiter.api.AfterEach; @@ -77,15 +78,15 @@ void tearDown() { @Test void shouldFailGivenPromiseWhenExceptionCaught() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); var cause = new RuntimeException("Error!"); channel.pipeline().fireExceptionCaught(cause); // promise should fail - var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(cause, error.getCause()); // channel should be closed @@ -94,15 +95,15 @@ void shouldFailGivenPromiseWhenExceptionCaught() { @Test void shouldFailGivenPromiseWhenServiceUnavailableExceptionCaught() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); var error = new ServiceUnavailableException("Bad error"); channel.pipeline().fireExceptionCaught(error); // promise should fail - var e = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var e = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(error, e); // channel should be closed @@ -111,8 +112,8 @@ void shouldFailGivenPromiseWhenServiceUnavailableExceptionCaught() { @Test void shouldFailGivenPromiseWhenMultipleExceptionsCaught() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); var error1 = new RuntimeException("Error 1"); @@ -121,7 +122,7 @@ void shouldFailGivenPromiseWhenMultipleExceptionsCaught() { channel.pipeline().fireExceptionCaught(error2); // promise should fail - var e1 = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var e1 = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(error1, e1.getCause()); // channel should be closed @@ -133,15 +134,15 @@ void shouldFailGivenPromiseWhenMultipleExceptionsCaught() { @Test void shouldUnwrapDecoderException() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); var cause = new IOException("Error!"); channel.pipeline().fireExceptionCaught(new DecoderException(cause)); // promise should fail - var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(cause, error.getCause()); // channel should be closed @@ -150,14 +151,14 @@ void shouldUnwrapDecoderException() { @Test void shouldHandleDecoderExceptionWithoutCause() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); var decoderException = new DecoderException("Unable to decode a message"); channel.pipeline().fireExceptionCaught(decoderException); - var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(decoderException, error.getCause()); // channel should be closed @@ -166,15 +167,15 @@ void shouldHandleDecoderExceptionWithoutCause() { @Test void shouldTranslateSSLHandshakeException() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); var error = new SSLHandshakeException("Invalid certificate"); channel.pipeline().fireExceptionCaught(error); // promise should fail - var e = assertThrows(SecurityException.class, () -> await(handshakeCompletedPromise)); + var e = assertThrows(SecurityException.class, () -> await(handshakeCompletedFuture)); assertEquals(error, e.getCause()); // channel should be closed @@ -185,9 +186,9 @@ void shouldTranslateSSLHandshakeException() { @MethodSource("protocolVersions") public void testProtocolSelection( BoltProtocolVersion protocolVersion, Class expectedMessageFormatClass) { - var handshakeCompletedPromise = channel.newPromise(); + var handshakeCompletedFuture = new CompletableFuture(); var pipelineBuilder = new MemorizingChannelPipelineBuilder(); - var handler = newHandler(pipelineBuilder, handshakeCompletedPromise); + var handler = newHandler(pipelineBuilder, handshakeCompletedFuture); channel.pipeline().addLast(handler); channel.pipeline().fireChannelRead(copyInt(protocolVersion.toInt())); @@ -207,7 +208,7 @@ public void testProtocolSelection( assertNotNull(channel.pipeline().get(OutboundMessageHandler.class)); // promise should be successful - assertNull(await(handshakeCompletedPromise)); + assertEquals(channel, await(handshakeCompletedFuture)); } @Test @@ -227,14 +228,14 @@ void shouldFailGivenPromiseWhenServerSuggestsUnknownProtocol() { @Test void shouldFailGivenPromiseWhenChannelInactive() { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); channel.pipeline().fireChannelInactive(); // promise should fail - var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedPromise)); + var error = assertThrows(ServiceUnavailableException.class, () -> await(handshakeCompletedFuture)); assertEquals(ErrorUtil.newConnectionTerminatedError().getMessage(), error.getMessage()); // channel should be closed @@ -242,8 +243,8 @@ void shouldFailGivenPromiseWhenChannelInactive() { } private void testFailure(BoltProtocolVersion serverSuggestedVersion, String expectedMessagePrefix) { - var handshakeCompletedPromise = channel.newPromise(); - var handler = newHandler(handshakeCompletedPromise); + var handshakeCompletedFuture = new CompletableFuture(); + var handler = newHandler(handshakeCompletedFuture); channel.pipeline().addLast(handler); channel.pipeline().fireChannelRead(copyInt(serverSuggestedVersion.toInt())); @@ -252,7 +253,7 @@ private void testFailure(BoltProtocolVersion serverSuggestedVersion, String expe assertNull(channel.pipeline().get(HandshakeHandler.class)); // promise should fail - var error = assertThrows(Exception.class, () -> await(handshakeCompletedPromise)); + var error = assertThrows(Exception.class, () -> await(handshakeCompletedFuture)); assertThat(error, instanceOf(ClientException.class)); assertThat(error.getMessage(), startsWith(expectedMessagePrefix)); @@ -268,12 +269,12 @@ private static Stream protocolVersions() { arguments(BoltProtocolV42.VERSION, MessageFormatV4.class)); } - private static HandshakeHandler newHandler(ChannelPromise handshakeCompletedPromise) { - return newHandler(new ChannelPipelineBuilderImpl(), handshakeCompletedPromise); + private static HandshakeHandler newHandler(CompletableFuture handshakeCompletedFuture) { + return newHandler(new ChannelPipelineBuilderImpl(), handshakeCompletedFuture); } private static HandshakeHandler newHandler( - ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise) { + ChannelPipelineBuilder pipelineBuilder, CompletableFuture handshakeCompletedPromise) { return new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, NoopLoggingProvider.INSTANCE); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3Test.java index 5124cf49fb..7fa874083a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v3/BoltProtocolV3Test.java @@ -123,27 +123,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -151,8 +151,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -160,28 +160,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v4/BoltProtocolV4Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v4/BoltProtocolV4Test.java index 6865294115..18c0c4325a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v4/BoltProtocolV4Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v4/BoltProtocolV4Test.java @@ -109,27 +109,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -137,8 +137,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -146,28 +146,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v41/BoltProtocolV41Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v41/BoltProtocolV41Test.java index d8c4fb015f..8acc4d3e34 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v41/BoltProtocolV41Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v41/BoltProtocolV41Test.java @@ -112,27 +112,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -140,8 +140,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -149,28 +149,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v42/BoltProtocolV42Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v42/BoltProtocolV42Test.java index 45bed5cf71..6e9c923898 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v42/BoltProtocolV42Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v42/BoltProtocolV42Test.java @@ -112,27 +112,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -140,8 +140,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -149,28 +149,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v43/BoltProtocolV43Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v43/BoltProtocolV43Test.java index a2c2efcf89..36c0d708ba 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v43/BoltProtocolV43Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v43/BoltProtocolV43Test.java @@ -115,27 +115,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -143,8 +143,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -152,28 +152,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v44/BoltProtocolV44Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v44/BoltProtocolV44Test.java index 2663a1fe40..113aff1623 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v44/BoltProtocolV44Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v44/BoltProtocolV44Test.java @@ -117,27 +117,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -145,8 +145,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -154,28 +154,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v5/BoltProtocolV5Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v5/BoltProtocolV5Test.java index 32a0edf1b7..aca75ed2ca 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v5/BoltProtocolV5Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v5/BoltProtocolV5Test.java @@ -117,27 +117,27 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -145,8 +145,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); @@ -154,28 +154,27 @@ void shouldInitializeChannel() { @Test void shouldFailToInitializeChannelWhenErrorIsReceived() { - var promise = channel.newPromise(); - - protocol.initializeChannel( - "MyDriver/2.2.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - mock(Clock.class), - new CompletableFuture<>()); + var future = protocol.initializeChannel( + channel, + "MyDriver/2.2.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + mock(Clock.class), + new CompletableFuture<>()) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(1)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertEquals(1, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); messageDispatcher.handleFailureMessage( new GqlError("Neo.TransientError.General.DatabaseUnavailable", "Error!")); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); + assertTrue(future.isDone()); + assertTrue(future.isCompletedExceptionally()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51Test.java index a6b2f17e2d..2547620423 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v51/BoltProtocolV51Test.java @@ -117,28 +117,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -147,8 +147,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v52/BoltProtocolV52Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v52/BoltProtocolV52Test.java index b7cd29fcde..16dcccd8b5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v52/BoltProtocolV52Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v52/BoltProtocolV52Test.java @@ -118,28 +118,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -148,8 +148,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53Test.java index 6ef0d8ce0c..50429c204c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v53/BoltProtocolV53Test.java @@ -118,28 +118,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -148,8 +148,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v54/BoltProtocolV54Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v54/BoltProtocolV54Test.java index ce190202ef..2f3d1ce69f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v54/BoltProtocolV54Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v54/BoltProtocolV54Test.java @@ -118,28 +118,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -148,8 +148,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v55/BoltProtocolV55Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v55/BoltProtocolV55Test.java index 9fbd93b085..036a3a65da 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v55/BoltProtocolV55Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v55/BoltProtocolV55Test.java @@ -113,28 +113,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -143,8 +143,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v56/BoltProtocolV56Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v56/BoltProtocolV56Test.java index f9ea151573..070e537825 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v56/BoltProtocolV56Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v56/BoltProtocolV56Test.java @@ -113,28 +113,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -143,8 +143,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v57/BoltProtocolV57Test.java b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v57/BoltProtocolV57Test.java index 45154f1322..383b22515a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v57/BoltProtocolV57Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/bolt/basicimpl/messaging/v57/BoltProtocolV57Test.java @@ -112,28 +112,28 @@ void shouldCreateMessageFormat() { @Test void shouldInitializeChannel() { - var promise = channel.newPromise(); var clock = mock(Clock.class); var time = 1L; when(clock.millis()).thenReturn(time); var latestAuthMillisFuture = new CompletableFuture(); - protocol.initializeChannel( - "MyDriver/0.0.1", - null, - Collections.emptyMap(), - RoutingContext.EMPTY, - promise, - null, - clock, - latestAuthMillisFuture); + var future = protocol.initializeChannel( + channel, + "MyDriver/0.0.1", + null, + Collections.emptyMap(), + RoutingContext.EMPTY, + null, + clock, + latestAuthMillisFuture) + .toCompletableFuture(); assertThat(channel.outboundMessages(), hasSize(2)); assertThat(channel.outboundMessages().poll(), instanceOf(HelloMessage.class)); assertThat(channel.outboundMessages().poll(), instanceOf(LogonMessage.class)); assertEquals(2, messageDispatcher.queuedHandlersCount()); - assertFalse(promise.isDone()); + assertFalse(future.isDone()); var metadata = Map.of( "server", value("Neo4j/3.5.0"), @@ -142,8 +142,8 @@ void shouldInitializeChannel() { messageDispatcher.handleSuccessMessage(metadata); messageDispatcher.handleSuccessMessage(Collections.emptyMap()); - assertTrue(promise.isDone()); - assertTrue(promise.isSuccess()); + assertTrue(future.isDone()); + assertEquals(channel, future.join()); verify(clock).millis(); assertTrue(latestAuthMillisFuture.isDone()); assertEquals(time, latestAuthMillisFuture.join());