diff --git a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java index 40c3add98e..0118675086 100644 --- a/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java +++ b/bolt-api-netty/src/main/java/org/neo4j/driver/internal/bolt/basicimpl/NettyBoltConnectionProvider.java @@ -58,27 +58,21 @@ public final class NettyBoltConnectionProvider implements BoltConnectionProvider { private final LoggingProvider logging; private final System.Logger log; - private final ConnectionProvider connectionProvider; - - private BoltServerAddress address; - - private RoutingContext routingContext; - private BoltAgent boltAgent; - private String userAgent; - private int connectTimeoutMillis; - private CompletableFuture closeFuture; - private MetricsListener metricsListener; + private final MetricsListener metricsListener; private final Clock clock; private final ValueFactory valueFactory; + private CompletableFuture closeFuture; + public NettyBoltConnectionProvider( EventLoopGroup eventLoopGroup, Clock clock, DomainNameResolver domainNameResolver, LocalAddress localAddress, LoggingProvider logging, - ValueFactory valueFactory) { + ValueFactory valueFactory, + MetricsListener metricsListener) { Objects.requireNonNull(eventLoopGroup); this.clock = Objects.requireNonNull(clock); this.logging = Objects.requireNonNull(logging); @@ -86,28 +80,17 @@ public NettyBoltConnectionProvider( this.connectionProvider = ConnectionProviders.netty( eventLoopGroup, clock, domainNameResolver, localAddress, logging, valueFactory); this.valueFactory = Objects.requireNonNull(valueFactory); + this.metricsListener = NoopMetricsListener.getInstance(); + InternalLoggerFactory.setDefaultFactory(new NettyLogging(logging)); } @Override - public CompletionStage init( + public CompletionStage connect( BoltServerAddress address, RoutingContext routingContext, BoltAgent boltAgent, String userAgent, int connectTimeoutMillis, - MetricsListener metricsListener) { - this.address = address; - this.routingContext = routingContext; - this.boltAgent = boltAgent; - this.userAgent = userAgent; - this.connectTimeoutMillis = connectTimeoutMillis; - this.metricsListener = NoopMetricsListener.getInstance(); - InternalLoggerFactory.setDefaultFactory(new NettyLogging(logging)); - return CompletableFuture.completedStage(null); - } - - @Override - public CompletionStage connect( SecurityPlan securityPlan, DatabaseName databaseName, Supplier> authTokenStageSupplier, @@ -180,8 +163,20 @@ public CompletionStage connect( } @Override - public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage verifyConnectivity( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), @@ -196,8 +191,20 @@ public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthT } @Override - public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage supportsMultiDb( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), @@ -215,8 +222,20 @@ public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthT } @Override - public CompletionStage supportsSessionAuth(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage supportsSessionAuth( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), diff --git a/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java b/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java index f367c4d237..c684b777cc 100644 --- a/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java +++ b/bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java @@ -66,11 +66,15 @@ public class PooledBoltConnectionProvider implements BoltConnectionProvider { private final long maxLifetime; private final long idleBeforeTest; private final Clock clock; - private MetricsListener metricsListener; - private CompletionStage closeStage; - private BoltServerAddress address; - private String poolId; + private final MetricsListener metricsListener; + private final BoltServerAddress address; + private final RoutingContext routingContext; + private final BoltAgent boltAgent; + private final String userAgent; + private final int connectTimeoutMillis; + private final String poolId; + private CompletionStage closeStage; private long minAuthTimestamp; public PooledBoltConnectionProvider( @@ -80,7 +84,13 @@ public PooledBoltConnectionProvider( long maxLifetime, long idleBeforeTest, Clock clock, - LoggingProvider logging) { + LoggingProvider logging, + MetricsListener metricsListener, + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis) { this.boltConnectionProvider = boltConnectionProvider; this.pooledConnectionEntries = new ArrayList<>(); this.pendingAcquisitions = new ArrayDeque<>(100); @@ -90,19 +100,13 @@ public PooledBoltConnectionProvider( this.idleBeforeTest = idleBeforeTest; this.clock = Objects.requireNonNull(clock); this.log = logging.getLog(getClass()); - } - - @Override - public CompletionStage init( - BoltServerAddress address, - RoutingContext routingContext, - BoltAgent boltAgent, - String userAgent, - int connectTimeoutMillis, - MetricsListener metricsListener) { + this.metricsListener = Objects.requireNonNull(metricsListener); this.address = Objects.requireNonNull(address); + this.routingContext = Objects.requireNonNull(routingContext); + this.boltAgent = Objects.requireNonNull(boltAgent); + this.userAgent = Objects.requireNonNull(userAgent); + this.connectTimeoutMillis = connectTimeoutMillis; this.poolId = poolId(address); - this.metricsListener = Objects.requireNonNull(metricsListener); metricsListener.registerPoolMetrics( poolId, address, @@ -120,13 +124,16 @@ public CompletionStage init( .count(); } }); - return boltConnectionProvider.init( - address, routingContext, boltAgent, userAgent, connectTimeoutMillis, metricsListener); } @SuppressWarnings({"ReassignedVariable"}) @Override public CompletionStage connect( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, SecurityPlan securityPlan, DatabaseName databaseName, Supplier> authTokenStageSupplier, @@ -334,6 +341,11 @@ private void connect( var entry = connectionEntryWithMetadata.connectionEntry; boltConnectionProvider .connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, databaseName, empty.get() @@ -502,8 +514,20 @@ private CompletionStage livenessCheckStage(ConnectionEntry entry) { } @Override - public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage verifyConnectivity( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), @@ -518,8 +542,20 @@ public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthT } @Override - public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage supportsMultiDb( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), @@ -537,8 +573,20 @@ public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthT } @Override - public CompletionStage supportsSessionAuth(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage supportsSessionAuth( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), diff --git a/bolt-api-pooled/src/test/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProviderTest.java b/bolt-api-pooled/src/test/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProviderTest.java index 3ab5bc48ef..1e4281062d 100644 --- a/bolt-api-pooled/src/test/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProviderTest.java +++ b/bolt-api-pooled/src/test/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProviderTest.java @@ -119,20 +119,30 @@ void beforeEach() { given(authTokenStageSupplier.get()) .willReturn(CompletableFuture.completedStage(AuthTokens.custom(Collections.emptyMap()))); provider = new PooledBoltConnectionProvider( - upstreamProvider, maxSize, acquisitionTimeout, maxLifetime, idleBeforeTest, clock, loggingProvider); - provider.init(address, context, boltAgent, userAgent, timeout, metricsListener); - } - - @Test - void shouldInitUpstream() { - // then - then(upstreamProvider).should().init(address, context, boltAgent, userAgent, timeout, metricsListener); + upstreamProvider, + maxSize, + acquisitionTimeout, + maxLifetime, + idleBeforeTest, + clock, + loggingProvider, + metricsListener, + address, + context, + boltAgent, + userAgent, + timeout); } @Test void shouldCreateNewConnection() { // given given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -147,6 +157,11 @@ void shouldCreateNewConnection() { // when var connection = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -166,6 +181,11 @@ void shouldCreateNewConnection() { then(upstreamProvider) .should() .connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -185,6 +205,11 @@ void shouldTimeout() { // given var acquisitionTimeout = TimeUnit.SECONDS.toMillis(1); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -197,9 +222,25 @@ void shouldTimeout() { any())) .willReturn(CompletableFuture.completedStage(connection)); provider = new PooledBoltConnectionProvider( - upstreamProvider, 1, acquisitionTimeout, maxLifetime, idleBeforeTest, clock, loggingProvider); - provider.init(address, context, boltAgent, userAgent, timeout, metricsListener); + upstreamProvider, + 1, + acquisitionTimeout, + maxLifetime, + idleBeforeTest, + clock, + loggingProvider, + metricsListener, + address, + context, + boltAgent, + userAgent, + timeout); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -215,6 +256,11 @@ void shouldTimeout() { // when var connectionStage = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -244,6 +290,11 @@ void shouldReturnConnectionToPool() { return CompletableFuture.completedStage(null); }); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -256,6 +307,11 @@ void shouldReturnConnectionToPool() { any())) .willReturn(CompletableFuture.completedStage(connection)); var connection = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -294,6 +350,11 @@ void shouldUseExistingConnection() { given(authInfo.authToken()).willReturn(AuthTokens.custom(Collections.emptyMap())); given(connection.authInfo()).willReturn(CompletableFuture.completedStage(authInfo)); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -306,6 +367,11 @@ void shouldUseExistingConnection() { any())) .willReturn(CompletableFuture.completedStage(connection)); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -325,6 +391,11 @@ void shouldUseExistingConnection() { // when var connection = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -360,6 +431,11 @@ void shouldClose() { given(connection.close()).willReturn(CompletableFuture.completedStage(null)); given(upstreamProvider.close()).willReturn(CompletableFuture.completedStage(null)); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -372,6 +448,11 @@ void shouldClose() { any())) .willReturn(CompletableFuture.completedStage(connection)); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -407,6 +488,11 @@ void shouldVerifyConnectivity() { return CompletableFuture.completedStage(null); }); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(null), any(), @@ -420,7 +506,14 @@ void shouldVerifyConnectivity() { .willReturn(CompletableFuture.completedStage(connection)); // when - provider.verifyConnectivity(SecurityPlan.INSECURE, AuthTokens.custom(Collections.emptyMap())) + provider.verifyConnectivity( + address, + context, + boltAgent, + userAgent, + timeout, + SecurityPlan.INSECURE, + AuthTokens.custom(Collections.emptyMap())) .toCompletableFuture() .join(); @@ -428,6 +521,11 @@ void shouldVerifyConnectivity() { then(upstreamProvider) .should() .connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(null), any(), @@ -455,6 +553,11 @@ void shouldSupportMultiDb(BoltProtocolVersion boltProtocolVersion, boolean expec return CompletableFuture.completedStage(null); }); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(null), any(), @@ -468,7 +571,14 @@ void shouldSupportMultiDb(BoltProtocolVersion boltProtocolVersion, boolean expec .willReturn(CompletableFuture.completedStage(connection)); // when - var supports = provider.supportsMultiDb(SecurityPlan.INSECURE, AuthTokens.custom(Collections.emptyMap())) + var supports = provider.supportsMultiDb( + address, + context, + boltAgent, + userAgent, + timeout, + SecurityPlan.INSECURE, + AuthTokens.custom(Collections.emptyMap())) .toCompletableFuture() .join(); @@ -477,6 +587,11 @@ void shouldSupportMultiDb(BoltProtocolVersion boltProtocolVersion, boolean expec then(upstreamProvider) .should() .connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(null), any(), @@ -510,6 +625,11 @@ void shouldSupportsSessionAuth(BoltProtocolVersion boltProtocolVersion, boolean return CompletableFuture.completedStage(null); }); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(null), any(), @@ -523,7 +643,14 @@ void shouldSupportsSessionAuth(BoltProtocolVersion boltProtocolVersion, boolean .willReturn(CompletableFuture.completedStage(connection)); // when - var supports = provider.supportsSessionAuth(SecurityPlan.INSECURE, AuthTokens.custom(Collections.emptyMap())) + var supports = provider.supportsSessionAuth( + address, + context, + boltAgent, + userAgent, + timeout, + SecurityPlan.INSECURE, + AuthTokens.custom(Collections.emptyMap())) .toCompletableFuture() .join(); @@ -532,6 +659,11 @@ void shouldSupportsSessionAuth(BoltProtocolVersion boltProtocolVersion, boolean then(upstreamProvider) .should() .connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(null), any(), @@ -565,6 +697,11 @@ void shouldThrowOnLowerVersion() { return CompletableFuture.completedStage(null); }); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -577,6 +714,11 @@ void shouldThrowOnLowerVersion() { any())) .willReturn(CompletableFuture.completedStage(connection)); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -595,6 +737,11 @@ void shouldThrowOnLowerVersion() { // when var future = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -626,6 +773,11 @@ void shouldTestMaxLifetime() { }); var connection2 = mock(BoltConnection.class); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -639,6 +791,11 @@ void shouldTestMaxLifetime() { .willReturn(CompletableFuture.completedStage(connection)) .willReturn(CompletableFuture.completedStage(connection2)); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -658,6 +815,11 @@ void shouldTestMaxLifetime() { // when var anotherConnection = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -695,6 +857,11 @@ void shouldTestLiveness() { given(authInfo.authToken()).willReturn(AuthTokens.custom(Collections.emptyMap())); given(connection.authInfo()).willReturn(CompletableFuture.completedStage(authInfo)); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -707,6 +874,11 @@ void shouldTestLiveness() { any())) .willReturn(CompletableFuture.completedStage(connection)); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -726,6 +898,11 @@ void shouldTestLiveness() { // when var actualConnection = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -769,6 +946,11 @@ void shouldPipelineReauth() { .willReturn(CompletableFuture.completedStage(AuthTokens.custom(Collections.emptyMap()))) .willReturn(CompletableFuture.completedStage(authToken)); given(upstreamProvider.connect( + eq(address), + eq(context), + eq(boltAgent), + eq(userAgent), + eq(timeout), eq(securityPlan), eq(databaseName), any(), @@ -781,6 +963,11 @@ void shouldPipelineReauth() { any())) .willReturn(CompletableFuture.completedStage(connection)); provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, @@ -799,6 +986,11 @@ void shouldPipelineReauth() { // when var actualConnection = provider.connect( + address, + context, + boltAgent, + userAgent, + timeout, securityPlan, databaseName, authTokenStageSupplier, diff --git a/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnectionProvider.java b/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnectionProvider.java index 15b3b7f939..ad0024de14 100644 --- a/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnectionProvider.java +++ b/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/RoutedBoltConnectionProvider.java @@ -69,72 +69,64 @@ public class RoutedBoltConnectionProvider implements BoltConnectionProvider { "Failed to obtain connection towards %s server. Known routing table is: %s"; private static final String CONNECTION_ACQUISITION_ATTEMPT_FAILURE_MESSAGE = "Failed to obtain a connection towards address %s, will try other addresses if available. Complete failure is reported separately from this entry."; - private final LoggingProvider logging; private final System.Logger log; - private final Supplier boltConnectionProviderSupplier; - + private final Function boltConnectionProviderFunction; private final Map addressToProvider = new HashMap<>(); - private final Function> resolver; - private final DomainNameResolver domainNameResolver; private final Map addressToInUseCount = new HashMap<>(); - private final LoadBalancingStrategy loadBalancingStrategy; - private final long routingTablePurgeDelayMs; + private final RoutingTableRegistry registry; + private final RoutingContext routingContext; + private final BoltAgent boltAgent; + private final String userAgent; + private final int connectTimeoutMillis; private Rediscovery rediscovery; - private RoutingTableRegistry registry; - - private RoutingContext routingContext; - private BoltAgent boltAgent; - private String userAgent; - private int connectTimeoutMillis; private CompletableFuture closeFuture; - private final Clock clock; - private MetricsListener metricsListener; public RoutedBoltConnectionProvider( - Supplier boltConnectionProviderSupplier, + Function boltConnectionProviderFunction, Function> resolver, DomainNameResolver domainNameResolver, long routingTablePurgeDelayMs, Rediscovery rediscovery, Clock clock, - LoggingProvider logging) { - this.boltConnectionProviderSupplier = Objects.requireNonNull(boltConnectionProviderSupplier); - this.resolver = Objects.requireNonNull(resolver); - this.logging = Objects.requireNonNull(logging); - this.log = logging.getLog(getClass()); - this.loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(this::getInUseCount, logging); - this.domainNameResolver = Objects.requireNonNull(domainNameResolver); - this.routingTablePurgeDelayMs = routingTablePurgeDelayMs; - this.rediscovery = rediscovery; - this.clock = Objects.requireNonNull(clock); - } - - @Override - public synchronized CompletionStage init( + LoggingProvider logging, BoltServerAddress address, RoutingContext routingContext, BoltAgent boltAgent, String userAgent, int connectTimeoutMillis, MetricsListener metricsListener) { + this.boltConnectionProviderFunction = Objects.requireNonNull(boltConnectionProviderFunction); + this.log = logging.getLog(getClass()); + this.loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(this::getInUseCount, logging); + this.rediscovery = rediscovery; this.routingContext = routingContext; this.boltAgent = boltAgent; this.userAgent = userAgent; this.connectTimeoutMillis = connectTimeoutMillis; if (this.rediscovery == null) { - this.rediscovery = new RediscoveryImpl(address, resolver, logging, domainNameResolver); + this.rediscovery = new RediscoveryImpl( + address, + resolver, + logging, + domainNameResolver, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis); } this.registry = new RoutingTableRegistryImpl( - this::get, rediscovery, clock, logging, routingTablePurgeDelayMs, this::shutdownUnusedProviders); - this.metricsListener = Objects.requireNonNull(metricsListener); - - return CompletableFuture.completedStage(null); + this::get, this.rediscovery, clock, logging, routingTablePurgeDelayMs, this::shutdownUnusedProviders); } @Override public CompletionStage connect( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, SecurityPlan securityPlan, DatabaseName databaseName, Supplier> authTokenStageSupplier, @@ -208,12 +200,19 @@ public CompletionStage connect( } @Override - public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage verifyConnectivity( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { RoutingTableRegistry registry; synchronized (this) { registry = this.registry; } - return supportsMultiDb(securityPlan, authToken) + return supportsMultiDb(null, null, null, null, 0, securityPlan, authToken) .thenCompose(supports -> registry.ensureRoutingTable( securityPlan, supports @@ -240,7 +239,14 @@ public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthT } @Override - public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage supportsMultiDb( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return detectFeature( securityPlan, authToken, @@ -249,7 +255,14 @@ public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthT } @Override - public CompletionStage supportsSessionAuth(SecurityPlan securityPlan, AuthToken authToken) { + public CompletionStage supportsSessionAuth( + BoltServerAddress ignoredAddress, + RoutingContext ignoredRoutingContext, + BoltAgent ignoredBoltAgent, + String ignoredUserAgent, + int ignoredConnectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { return detectFeature( securityPlan, authToken, @@ -304,6 +317,11 @@ private CompletionStage detectFeature( } return get(address) .connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, () -> CompletableFuture.completedStage(authToken), @@ -390,6 +408,11 @@ private void acquire( get(address) .connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, database, authTokenStageSupplier, @@ -487,8 +510,7 @@ public CompletionStage close() { private synchronized BoltConnectionProvider get(BoltServerAddress address) { var provider = addressToProvider.get(address); if (provider == null) { - provider = boltConnectionProviderSupplier.get(); - provider.init(address, routingContext, boltAgent, userAgent, connectTimeoutMillis, metricsListener); + provider = boltConnectionProviderFunction.apply(address); addressToProvider.put(address, provider); } return provider; diff --git a/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryImpl.java b/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryImpl.java index 29769041f2..7ae0969444 100644 --- a/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryImpl.java +++ b/bolt-api-routed/src/main/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryImpl.java @@ -37,6 +37,7 @@ import javax.net.ssl.SSLHandshakeException; import org.neo4j.driver.internal.bolt.api.AccessMode; import org.neo4j.driver.internal.bolt.api.AuthToken; +import org.neo4j.driver.internal.bolt.api.BoltAgent; import org.neo4j.driver.internal.bolt.api.BoltConnection; import org.neo4j.driver.internal.bolt.api.BoltConnectionProvider; import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; @@ -45,6 +46,7 @@ import org.neo4j.driver.internal.bolt.api.DomainNameResolver; import org.neo4j.driver.internal.bolt.api.LoggingProvider; import org.neo4j.driver.internal.bolt.api.ResponseHandler; +import org.neo4j.driver.internal.bolt.api.RoutingContext; import org.neo4j.driver.internal.bolt.api.SecurityPlan; import org.neo4j.driver.internal.bolt.api.exception.BoltDiscoveryException; import org.neo4j.driver.internal.bolt.api.exception.BoltFailureException; @@ -78,16 +80,28 @@ public class RediscoveryImpl implements Rediscovery { private final System.Logger log; private final Function> resolver; private final DomainNameResolver domainNameResolver; + private final RoutingContext routingContext; + private final BoltAgent boltAgent; + private final String userAgent; + private final int connectTimeoutMillis; public RediscoveryImpl( BoltServerAddress initialRouter, Function> resolver, LoggingProvider logging, - DomainNameResolver domainNameResolver) { + DomainNameResolver domainNameResolver, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis) { this.initialRouter = initialRouter; this.log = logging.getLog(getClass()); this.resolver = resolver; this.domainNameResolver = requireNonNull(domainNameResolver); + this.routingContext = routingContext; + this.boltAgent = boltAgent; + this.userAgent = userAgent; + this.connectTimeoutMillis = connectTimeoutMillis; } @Override @@ -360,6 +374,11 @@ private CompletionStage lookupOnRouter( .thenCompose(address -> connectionProviderGetter .apply(address) .connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, null, authTokenStageSupplier, diff --git a/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryTest.java b/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryTest.java index dc5dd38263..7f5b018a08 100644 --- a/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryTest.java +++ b/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RediscoveryTest.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.startsWith; @@ -60,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import org.mockito.stubbing.Answer; +import org.neo4j.driver.internal.bolt.api.BoltAgent; import org.neo4j.driver.internal.bolt.api.BoltConnection; import org.neo4j.driver.internal.bolt.api.BoltConnectionProvider; import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; @@ -70,6 +72,7 @@ import org.neo4j.driver.internal.bolt.api.GqlStatusError; import org.neo4j.driver.internal.bolt.api.LoggingProvider; import org.neo4j.driver.internal.bolt.api.ResponseHandler; +import org.neo4j.driver.internal.bolt.api.RoutingContext; import org.neo4j.driver.internal.bolt.api.SecurityPlan; import org.neo4j.driver.internal.bolt.api.exception.BoltFailureException; import org.neo4j.driver.internal.bolt.api.exception.BoltServiceUnavailableException; @@ -566,7 +569,15 @@ void shouldNotLogWhenSingleRetryAttemptFails() { var logging = mock(LoggingProvider.class); var logger = mock(System.Logger.class); when(logging.getLog(any(Class.class))).thenReturn(logger); - Rediscovery rediscovery = new RediscoveryImpl(A, resolver, logging, DefaultDomainNameResolver.getInstance()); + Rediscovery rediscovery = new RediscoveryImpl( + A, + resolver, + logging, + DefaultDomainNameResolver.getInstance(), + RoutingContext.EMPTY, + mock(BoltAgent.class), + "userAgent", + 0); var table = routingTableMock(A); Throwable e = assertThrows(CompletionException.class, () -> rediscovery @@ -596,7 +607,15 @@ void shouldResolveToIP() throws UnknownHostException { var domainNameResolver = mock(DomainNameResolver.class); var localhost = InetAddress.getLocalHost(); when(domainNameResolver.resolve(A.host())).thenReturn(new InetAddress[] {localhost}); - Rediscovery rediscovery = new RediscoveryImpl(A, resolver, NoopLoggingProvider.INSTANCE, domainNameResolver); + Rediscovery rediscovery = new RediscoveryImpl( + A, + resolver, + NoopLoggingProvider.INSTANCE, + domainNameResolver, + RoutingContext.EMPTY, + mock(BoltAgent.class), + "userAgent", + 0); var addresses = rediscovery.resolve(); @@ -676,7 +695,9 @@ void shouldLogScopedIPV6AddressWithStringFormattingLogger() throws UnknownHostEx given(domainNameResolver.resolve(initialRouter.host())).willReturn(new InetAddress[] {address}); var table = routingTableMock(true); var pool = mock(BoltConnectionProvider.class); - given(pool.connect(any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) + given(pool.connect( + any(), any(), any(), any(), anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), + any(), any())) .willReturn(failedFuture(new BoltServiceUnavailableException("not available"))); var logging = mock(LoggingProvider.class); var logger = mock(System.Logger.class); @@ -684,7 +705,15 @@ void shouldLogScopedIPV6AddressWithStringFormattingLogger() throws UnknownHostEx doAnswer(invocationOnMock -> String.format(invocationOnMock.getArgument(0), invocationOnMock.getArgument(1))) .when(logger) .log(eq(System.Logger.Level.WARNING), anyString()); - var rediscovery = new RediscoveryImpl(initialRouter, resolver, logging, domainNameResolver); + var rediscovery = new RediscoveryImpl( + initialRouter, + resolver, + logging, + domainNameResolver, + RoutingContext.EMPTY, + mock(BoltAgent.class), + "userAgent", + 0); // WHEN & THEN Throwable e = assertThrows(CompletionException.class, () -> rediscovery @@ -712,7 +741,15 @@ private Rediscovery newRediscovery( BoltServerAddress initialRouter, Function> resolver, LoggingProvider loggingProvider) { - return new RediscoveryImpl(initialRouter, resolver, loggingProvider, DefaultDomainNameResolver.getInstance()); + return new RediscoveryImpl( + initialRouter, + resolver, + loggingProvider, + DefaultDomainNameResolver.getInstance(), + RoutingContext.EMPTY, + mock(BoltAgent.class), + "userAgent", + 0); } private Function connectionProviderGetter( @@ -722,7 +759,9 @@ private Function connectionProviderGe var boltConnection = setupConnection(entry.getValue()); var boltConnectionProvider = mock(BoltConnectionProvider.class); - given(boltConnectionProvider.connect(any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) + given(boltConnectionProvider.connect( + any(), any(), any(), any(), anyInt(), any(), any(), any(), any(), any(), any(), any(), + any(), any(), any())) .willReturn(completedFuture(boltConnection)); addressToProvider.put(entry.getKey(), boltConnectionProvider); diff --git a/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RoutingTableHandlerTest.java b/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RoutingTableHandlerTest.java index 2a9e4369b9..34265a6853 100644 --- a/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RoutingTableHandlerTest.java +++ b/bolt-api-routed/src/test/java/org/neo4j/driver/internal/bolt/routedimpl/impl/cluster/RoutingTableHandlerTest.java @@ -253,7 +253,21 @@ private void testRediscoveryWhenStale(AccessMode mode) { var boltConnectionProvider = mock(BoltConnectionProvider.class); var connection = mock(BoltConnection.class); given(boltConnectionProvider.connect( - any(), any(), any(), any(), any(), any(), any(), any(), any(), Collections.emptyMap())) + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + Collections.emptyMap())) .willReturn(completedFuture(connection)); return boltConnectionProvider; }; @@ -283,7 +297,21 @@ private void testNoRediscoveryWhenNotStale(AccessMode staleMode, AccessMode notS var boltConnectionProvider = mock(BoltConnectionProvider.class); var connection = mock(BoltConnection.class); given(boltConnectionProvider.connect( - any(), any(), any(), any(), any(), any(), any(), any(), any(), Collections.emptyMap())) + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + Collections.emptyMap())) .willReturn(completedFuture(connection)); return boltConnectionProvider; }; @@ -344,7 +372,21 @@ private static Function newConnection var boltConnectionProvider = mock(BoltConnectionProvider.class); if (unavailableAddresses.contains(requestedAddress)) { given(boltConnectionProvider.connect( - any(), any(), any(), any(), any(), any(), any(), any(), any(), Collections.emptyMap())) + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + Collections.emptyMap())) .willReturn(CompletableFuture.failedFuture( new BoltServiceUnavailableException(requestedAddress + " is unavailable!"))); return boltConnectionProvider; @@ -352,7 +394,21 @@ private static Function newConnection var connection = mock(BoltConnection.class); when(connection.serverAddress()).thenReturn(requestedAddress); given(boltConnectionProvider.connect( - any(), any(), any(), any(), any(), any(), any(), any(), any(), Collections.emptyMap())) + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + any(), + Collections.emptyMap())) .willReturn(completedFuture(connection)); return boltConnectionProvider; }; diff --git a/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnectionProvider.java b/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnectionProvider.java index 3842951156..54883b8a2d 100644 --- a/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnectionProvider.java +++ b/bolt-api/src/main/java/org/neo4j/driver/internal/bolt/api/BoltConnectionProvider.java @@ -23,15 +23,12 @@ import java.util.function.Supplier; public interface BoltConnectionProvider { - CompletionStage init( + CompletionStage connect( BoltServerAddress address, RoutingContext routingContext, BoltAgent boltAgent, String userAgent, int connectTimeoutMillis, - MetricsListener metricsListener); - - CompletionStage connect( SecurityPlan securityPlan, DatabaseName databaseName, Supplier> authTokenStageSupplier, @@ -43,11 +40,32 @@ CompletionStage connect( Consumer databaseNameConsumer, Map additionalParameters); - CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthToken authToken); + CompletionStage verifyConnectivity( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken); - CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthToken authToken); + CompletionStage supportsMultiDb( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken); - CompletionStage supportsSessionAuth(SecurityPlan securityPlan, AuthToken authToken); + CompletionStage supportsSessionAuth( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken); CompletionStage close(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index ed38f4fe2f..bf2b335b5d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -42,11 +42,13 @@ import org.neo4j.driver.internal.adaptedbolt.AdaptingDriverBoltConnectionProvider; import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnectionProvider; import org.neo4j.driver.internal.adaptedbolt.ErrorMapper; +import org.neo4j.driver.internal.bolt.api.BoltAgent; import org.neo4j.driver.internal.bolt.api.BoltConnectionProvider; import org.neo4j.driver.internal.bolt.api.BoltServerAddress; import org.neo4j.driver.internal.bolt.api.DefaultDomainNameResolver; import org.neo4j.driver.internal.bolt.api.DomainNameResolver; import org.neo4j.driver.internal.bolt.api.LoggingProvider; +import org.neo4j.driver.internal.bolt.api.MetricsListener; import org.neo4j.driver.internal.bolt.api.RoutingContext; import org.neo4j.driver.internal.bolt.basicimpl.BootstrapFactory; import org.neo4j.driver.internal.bolt.basicimpl.NettyBoltConnectionProvider; @@ -169,8 +171,12 @@ private InternalDriver createDriver( var homeDatabaseCache = HomeDatabaseCache.newInstance(uri.getScheme().startsWith("neo4j")); boltConnectionProvider = createBoltConnectionProvider( - uri, config, eventLoopGroup, routingSettings, rediscoverySupplier, homeDatabaseCache); - boltConnectionProvider.init( + uri, + config, + eventLoopGroup, + routingSettings, + rediscoverySupplier, + homeDatabaseCache, new BoltServerAddress(address.host(), address.port()), new RoutingContext(uri), DriverInfoUtil.boltAgent(), @@ -223,18 +229,42 @@ private DriverBoltConnectionProvider createBoltConnectionProvider( EventLoopGroup eventLoopGroup, RoutingSettings routingSettings, Supplier rediscoverySupplier, - BoltConnectionListener boltConnectionListener) { + BoltConnectionListener boltConnectionListener, + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + MetricsListener metricsListener) { DriverBoltConnectionProvider boltConnectionProvider; var clock = createClock(); var loggingProvider = new BoltLoggingProvider(config.logging()); - Supplier pooledBoltConnectionProviderSupplier = - () -> createPooledBoltConnectionProvider( - config, eventLoopGroup, clock, loggingProvider, boltConnectionListener); + Function pooledBoltConnectionProviderSupplier = + selectedAddress -> createPooledBoltConnectionProvider( + config, + eventLoopGroup, + clock, + loggingProvider, + boltConnectionListener, + selectedAddress, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, + metricsListener); var errorMapper = ErrorMapper.getInstance(); if (uri.getScheme().startsWith("bolt")) { assertNoRoutingContext(uri, routingSettings); boltConnectionProvider = new AdaptingDriverBoltConnectionProvider( - pooledBoltConnectionProviderSupplier.get(), errorMapper, BoltValueFactory.getInstance(), false); + pooledBoltConnectionProviderSupplier.apply(address), + errorMapper, + BoltValueFactory.getInstance(), + false, + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis); } else { boltConnectionProvider = new AdaptingDriverBoltConnectionProvider( createRoutedBoltConnectionProvider( @@ -243,31 +273,54 @@ private DriverBoltConnectionProvider createBoltConnectionProvider( routingSettings, rediscoverySupplier, clock, - loggingProvider), + loggingProvider, + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, + metricsListener), errorMapper, BoltValueFactory.getInstance(), - true); + true, + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis); } return boltConnectionProvider; } private BoltConnectionProvider createRoutedBoltConnectionProvider( Config config, - Supplier pooledBoltConnectionProviderSupplier, + Function pooledBoltConnectionProviderFunction, RoutingSettings routingSettings, Supplier rediscoverySupplier, Clock clock, - LoggingProvider loggingProvider) { + LoggingProvider loggingProvider, + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + MetricsListener metricsListener) { var boltServerAddressResolver = createBoltServerAddressResolver(config); var rediscovery = rediscoverySupplier != null ? rediscoverySupplier.get() : null; return new RoutedBoltConnectionProvider( - pooledBoltConnectionProviderSupplier, + pooledBoltConnectionProviderFunction, boltServerAddressResolver, getDomainNameResolver(), routingSettings.routingTablePurgeDelayMs(), rediscovery, clock, - loggingProvider); + loggingProvider, + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, + metricsListener); } private BoltConnectionProvider createPooledBoltConnectionProvider( @@ -275,7 +328,13 @@ private BoltConnectionProvider createPooledBoltConnectionProvider( EventLoopGroup eventLoopGroup, Clock clock, LoggingProvider loggingProvider, - BoltConnectionListener boltConnectionListener) { + BoltConnectionListener boltConnectionListener, + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + MetricsListener metricsListener) { var nettyBoltConnectionProvider = createNettyBoltConnectionProvider(eventLoopGroup, clock, loggingProvider); nettyBoltConnectionProvider = BoltConnectionListener.listeningBoltConnectionProvider( nettyBoltConnectionProvider, boltConnectionListener); @@ -286,7 +345,13 @@ private BoltConnectionProvider createPooledBoltConnectionProvider( config.maxConnectionLifetimeMillis(), config.idleTimeBeforeConnectionTest(), clock, - loggingProvider); + loggingProvider, + metricsListener, + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis); } private BoltConnectionProvider createNettyBoltConnectionProvider( @@ -297,7 +362,8 @@ private BoltConnectionProvider createNettyBoltConnectionProvider( getDomainNameResolver(), localAddress(), loggingProvider, - BoltValueFactory.getInstance()); + BoltValueFactory.getInstance(), + null); } @SuppressWarnings("SameReturnValue") diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionProvider.java index 8a549020a0..f4761664ab 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionProvider.java @@ -30,7 +30,6 @@ import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; import org.neo4j.driver.internal.bolt.api.BoltServerAddress; import org.neo4j.driver.internal.bolt.api.DatabaseName; -import org.neo4j.driver.internal.bolt.api.MetricsListener; import org.neo4j.driver.internal.bolt.api.NotificationConfig; import org.neo4j.driver.internal.bolt.api.RoutingContext; import org.neo4j.driver.internal.bolt.api.SecurityPlan; @@ -41,28 +40,31 @@ public class AdaptingDriverBoltConnectionProvider implements DriverBoltConnectio private final ErrorMapper errorMapper; private final BoltValueFactory boltValueFactory; private final boolean routed; + private final BoltServerAddress address; + private final RoutingContext routingContext; + private final BoltAgent boltAgent; + private final String userAgent; + private final int connectTimeoutMillis; public AdaptingDriverBoltConnectionProvider( BoltConnectionProvider delegate, ErrorMapper errorMapper, BoltValueFactory boltValueFactory, - boolean routed) { - this.delegate = Objects.requireNonNull(delegate); - this.errorMapper = Objects.requireNonNull(errorMapper); - this.boltValueFactory = Objects.requireNonNull(boltValueFactory); - this.routed = routed; - } - - @Override - public CompletionStage init( + boolean routed, BoltServerAddress address, RoutingContext routingContext, BoltAgent boltAgent, String userAgent, - int connectTimeoutMillis, - MetricsListener metricsListener) { - return delegate.init(address, routingContext, boltAgent, userAgent, connectTimeoutMillis, metricsListener) - .exceptionally(errorMapper::mapAndTrow); + int connectTimeoutMillis) { + this.delegate = Objects.requireNonNull(delegate); + this.errorMapper = Objects.requireNonNull(errorMapper); + this.boltValueFactory = Objects.requireNonNull(boltValueFactory); + this.routed = routed; + this.address = Objects.requireNonNull(address); + this.routingContext = Objects.requireNonNull(routingContext); + this.boltAgent = Objects.requireNonNull(boltAgent); + this.userAgent = Objects.requireNonNull(userAgent); + this.connectTimeoutMillis = connectTimeoutMillis; } @Override @@ -78,6 +80,11 @@ public CompletionStage connect( Consumer databaseNameConsumer, Map additionalParameters) { return delegate.connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, databaseName, () -> authMapStageSupplier @@ -99,19 +106,40 @@ public CompletionStage connect( @Override public CompletionStage verifyConnectivity(SecurityPlan securityPlan, Map authMap) { - return delegate.verifyConnectivity(securityPlan, AuthTokens.custom(boltValueFactory.toBoltMap(authMap))) + return delegate.verifyConnectivity( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, + securityPlan, + AuthTokens.custom(boltValueFactory.toBoltMap(authMap))) .exceptionally(errorMapper::mapAndTrow); } @Override public CompletionStage supportsMultiDb(SecurityPlan securityPlan, Map authMap) { - return delegate.supportsMultiDb(securityPlan, AuthTokens.custom(boltValueFactory.toBoltMap(authMap))) + return delegate.supportsMultiDb( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, + securityPlan, + AuthTokens.custom(boltValueFactory.toBoltMap(authMap))) .exceptionally(errorMapper::mapAndTrow); } @Override public CompletionStage supportsSessionAuth(SecurityPlan securityPlan, Map authMap) { - return delegate.supportsSessionAuth(securityPlan, AuthTokens.custom(boltValueFactory.toBoltMap(authMap))) + return delegate.supportsSessionAuth( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, + securityPlan, + AuthTokens.custom(boltValueFactory.toBoltMap(authMap))) .exceptionally(errorMapper::mapAndTrow); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionProvider.java index 9ec92c91e9..5ec72c81d5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionProvider.java @@ -23,24 +23,12 @@ import java.util.function.Supplier; import org.neo4j.driver.Value; import org.neo4j.driver.internal.bolt.api.AccessMode; -import org.neo4j.driver.internal.bolt.api.BoltAgent; import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; -import org.neo4j.driver.internal.bolt.api.BoltServerAddress; import org.neo4j.driver.internal.bolt.api.DatabaseName; -import org.neo4j.driver.internal.bolt.api.MetricsListener; import org.neo4j.driver.internal.bolt.api.NotificationConfig; -import org.neo4j.driver.internal.bolt.api.RoutingContext; import org.neo4j.driver.internal.bolt.api.SecurityPlan; public interface DriverBoltConnectionProvider { - @SuppressWarnings("UnusedReturnValue") - CompletionStage init( - BoltServerAddress address, - RoutingContext routingContext, - BoltAgent boltAgent, - String userAgent, - int connectTimeoutMillis, - MetricsListener metricsListener); CompletionStage connect( SecurityPlan securityPlan, diff --git a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java index cad70d7936..0456311b59 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java @@ -30,7 +30,6 @@ import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion; import org.neo4j.driver.internal.bolt.api.BoltServerAddress; import org.neo4j.driver.internal.bolt.api.DatabaseName; -import org.neo4j.driver.internal.bolt.api.MetricsListener; import org.neo4j.driver.internal.bolt.api.NotificationConfig; import org.neo4j.driver.internal.bolt.api.RoutingContext; import org.neo4j.driver.internal.bolt.api.SecurityPlan; @@ -46,18 +45,12 @@ public ListeningBoltConnectionProvider( } @Override - public CompletionStage init( + public CompletionStage connect( BoltServerAddress address, RoutingContext routingContext, BoltAgent boltAgent, String userAgent, int connectTimeoutMillis, - MetricsListener metricsListener) { - return delegate.init(address, routingContext, boltAgent, userAgent, connectTimeoutMillis, metricsListener); - } - - @Override - public CompletionStage connect( SecurityPlan securityPlan, DatabaseName databaseName, Supplier> authTokenStageSupplier, @@ -69,6 +62,11 @@ public CompletionStage connect( Consumer databaseNameConsumer, Map additionalParameters) { return delegate.connect( + address, + routingContext, + boltAgent, + userAgent, + connectTimeoutMillis, securityPlan, databaseName, authTokenStageSupplier, @@ -87,18 +85,42 @@ public CompletionStage connect( } @Override - public CompletionStage verifyConnectivity(SecurityPlan securityPlan, AuthToken authToken) { - return delegate.verifyConnectivity(securityPlan, authToken); + public CompletionStage verifyConnectivity( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { + return delegate.verifyConnectivity( + address, routingContext, boltAgent, userAgent, connectTimeoutMillis, securityPlan, authToken); } @Override - public CompletionStage supportsMultiDb(SecurityPlan securityPlan, AuthToken authToken) { - return delegate.supportsMultiDb(securityPlan, authToken); + public CompletionStage supportsMultiDb( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { + return delegate.supportsMultiDb( + address, routingContext, boltAgent, userAgent, connectTimeoutMillis, securityPlan, authToken); } @Override - public CompletionStage supportsSessionAuth(SecurityPlan securityPlan, AuthToken authToken) { - return delegate.supportsSessionAuth(securityPlan, authToken); + public CompletionStage supportsSessionAuth( + BoltServerAddress address, + RoutingContext routingContext, + BoltAgent boltAgent, + String userAgent, + int connectTimeoutMillis, + SecurityPlan securityPlan, + AuthToken authToken) { + return delegate.supportsSessionAuth( + address, routingContext, boltAgent, userAgent, connectTimeoutMillis, securityPlan, authToken); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java index 3f48d5878f..ab03f8e4d9 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/reactive/ReactiveSessionIT.java @@ -34,6 +34,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.logging.Level; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -42,13 +43,13 @@ import org.junit.jupiter.params.provider.ValueSource; import org.neo4j.driver.Config; import org.neo4j.driver.ConnectionPoolMetrics; +import org.neo4j.driver.Logging; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.util.EnabledOnNeo4jWith; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.testutil.DatabaseExtension; -import org.neo4j.driver.testutil.LoggingUtil; import org.neo4j.driver.testutil.ParallelizableIT; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -85,7 +86,7 @@ void shouldReleaseResultsOnSubscriptionCancellation(boolean request) throws Inte var messages = Collections.synchronizedList(new ArrayList()); var config = Config.builder() .withDriverMetrics() - .withLogging(LoggingUtil.boltLogging(messages)) + .withLogging(Logging.console(Level.FINE)) .build(); try (var driver = neo4j.customDriver(config)) { // verify the database is available as runs may not report errors due to the subscription cancellation