diff --git a/driver/pom.xml b/driver/pom.xml index 6dd4eed4ea..947d7b6f33 100644 --- a/driver/pom.xml +++ b/driver/pom.xml @@ -29,6 +29,10 @@ io.netty netty-handler + + org.hdrhistogram + HdrHistogram + @@ -208,6 +212,7 @@ io.netty:* + org.hdrhistogram:* @@ -215,6 +220,10 @@ io.netty org.neo4j.driver.internal.shaded.io.netty + + org.HdrHistogram + org.neo4j.driver.internal.shaded.org.HdrHistogram + true diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 9de1eca0b1..7f0a2d533d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -38,6 +38,10 @@ import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy; import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy; import org.neo4j.driver.internal.logging.NettyLogging; +import org.neo4j.driver.internal.metrics.MetricsListener; +import org.neo4j.driver.internal.metrics.InternalAbstractMetrics; +import org.neo4j.driver.internal.metrics.InternalMetrics; +import org.neo4j.driver.internal.metrics.spi.Metrics; import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.retry.RetrySettings; @@ -56,6 +60,8 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.lang.String.format; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; +import static org.neo4j.driver.internal.metrics.spi.Metrics.isMetricsEnabled; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; public class DriverFactory @@ -77,18 +83,17 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r EventExecutorGroup eventExecutorGroup = bootstrap.config().group(); RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() ); - ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config ); + InternalAbstractMetrics metrics = createDriverMetrics( config ); + ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metrics, config ); - InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings, - eventExecutorGroup, securityPlan, retryLogic ); + InternalDriver driver = createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metrics, config ); verifyConnectivity( driver, connectionPool, config ); return driver; } - protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Bootstrap bootstrap, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config ) { Clock clock = createClock(); ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() ); @@ -97,7 +102,19 @@ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(), config.idleTimeBeforeConnectionTest() ); - return new ConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock ); + return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metrics, config.logging(), clock ); + } + + protected static InternalAbstractMetrics createDriverMetrics( Config config ) + { + if( isMetricsEnabled() ) + { + return new InternalMetrics( config ); + } + else + { + return DEV_NULL_METRICS; + } } protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan, @@ -106,9 +123,8 @@ protected ChannelConnector createConnector( ConnectionSettings settings, Securit return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock ); } - private InternalDriver createDriver( URI uri, BoltServerAddress address, - ConnectionPool connectionPool, Config config, RoutingSettings routingSettings, - EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic ) + private InternalDriver createDriver( URI uri, SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool, + EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, Metrics metrics, Config config ) { try { @@ -117,10 +133,9 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address, { case BOLT_URI_SCHEME: assertNoRoutingContext( uri, routingSettings ); - return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool ); + return createDirectDriver( securityPlan, address, connectionPool, retryLogic, metrics, config ); case BOLT_ROUTING_URI_SCHEME: - return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic, - eventExecutorGroup ); + return createRoutingDriver( securityPlan, address, connectionPool, eventExecutorGroup, routingSettings, retryLogic, metrics, config ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } @@ -138,12 +153,12 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address, *

* This method is protected only for testing */ - protected InternalDriver createDirectDriver( BoltServerAddress address, Config config, - SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool ) + protected InternalDriver createDirectDriver( SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool, RetryLogic retryLogic, + Metrics metrics, Config config ) { ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool ); SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config ); - return createDriver( sessionFactory, securityPlan, config ); + return createDriver( securityPlan, sessionFactory, metrics, config ); } /** @@ -151,9 +166,8 @@ protected InternalDriver createDirectDriver( BoltServerAddress address, Config c *

* This method is protected only for testing */ - protected InternalDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic, - EventExecutorGroup eventExecutorGroup ) + protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool, + EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, Metrics metrics, Config config ) { if ( !securityPlan.isRoutingCompatible() ) { @@ -162,7 +176,7 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup, config, routingSettings ); SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config ); - return createDriver( sessionFactory, securityPlan, config ); + return createDriver( securityPlan, sessionFactory, metrics, config ); } /** @@ -170,9 +184,9 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect *

* This method is protected only for testing */ - protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config ) + protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Config config ) { - return new InternalDriver( securityPlan, sessionFactory, config.logging() ); + return new InternalDriver( securityPlan, sessionFactory, metrics, config.logging() ); } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index 2528d493fd..3378ab3531 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; +import org.neo4j.driver.internal.metrics.spi.Metrics; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.v1.AccessMode; @@ -38,11 +39,13 @@ public class InternalDriver implements Driver private final Logger log; private AtomicBoolean closed = new AtomicBoolean( false ); + private final Metrics metrics; - InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging ) + InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Logging logging ) { this.securityPlan = securityPlan; this.sessionFactory = sessionFactory; + this.metrics = metrics; this.log = logging.getLog( Driver.class.getSimpleName() ); log.info( "Driver instance %s created", this ); } @@ -144,6 +147,11 @@ private void assertOpen() } } + public Metrics metrics() + { + return this.metrics; + } + private static RuntimeException driverCloseException() { return new IllegalStateException( "This driver instance has already been closed" ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index 3d3bb0c9e6..8035010730 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -34,6 +34,8 @@ import org.neo4j.driver.internal.messaging.PullAllMessage; import org.neo4j.driver.internal.messaging.ResetMessage; import org.neo4j.driver.internal.messaging.RunMessage; +import org.neo4j.driver.internal.metrics.ListenerEvent; +import org.neo4j.driver.internal.metrics.MetricsListener; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.util.Clock; @@ -54,8 +56,10 @@ public class NettyConnection implements Connection private final Clock clock; private final AtomicReference status = new AtomicReference<>( Status.OPEN ); + private final MetricsListener metricsListener; + private final ListenerEvent inUseEvent; - public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) + public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock, MetricsListener metricsListener ) { this.channel = channel; this.messageDispatcher = ChannelAttributes.messageDispatcher( channel ); @@ -64,6 +68,9 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) this.channelPool = channelPool; this.releaseFuture = new CompletableFuture<>(); this.clock = clock; + this.metricsListener = metricsListener; + this.inUseEvent = metricsListener.createListenerEvent(); + metricsListener.afterAcquiredOrCreated( this.serverAddress, this.inUseEvent ); } @Override @@ -124,6 +131,7 @@ public CompletionStage release() { if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) ) { + metricsListener.afterReleased( this.serverAddress, this.inUseEvent ); ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ); @@ -137,6 +145,7 @@ public void terminateAndRelease( String reason ) { if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) ) { + metricsListener.afterReleased( this.serverAddress, this.inUseEvent ); setTerminationReason( channel, reason ); channel.close(); channelPool.release( channel ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java deleted file mode 100644 index 728c96da8a..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.async.pool; - -import io.netty.channel.Channel; -import io.netty.channel.pool.ChannelPoolHandler; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Logging; - -import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress; - -public class ActiveChannelTracker implements ChannelPoolHandler -{ - private final Map addressToActiveChannelCount = new ConcurrentHashMap<>(); - private final Logger log; - - public ActiveChannelTracker( Logging logging ) - { - this.log = logging.getLog( getClass().getSimpleName() ); - } - - @Override - public void channelReleased( Channel channel ) - { - log.debug( "Channel %s released back to the pool", channel ); - channelInactive( channel ); - } - - @Override - public void channelAcquired( Channel channel ) - { - log.debug( "Channel %s acquired from the pool", channel ); - channelActive( channel ); - } - - @Override - public void channelCreated( Channel channel ) - { - log.debug( "Channel %s created", channel ); - channelActive( channel ); - } - - public int activeChannelCount( BoltServerAddress address ) - { - AtomicInteger count = addressToActiveChannelCount.get( address ); - return count == null ? 0 : count.get(); - } - - private void channelActive( Channel channel ) - { - BoltServerAddress address = serverAddress( channel ); - AtomicInteger count = addressToActiveChannelCount.computeIfAbsent( address, k -> new AtomicInteger() ); - count.incrementAndGet(); - } - - private void channelInactive( Channel channel ) - { - BoltServerAddress address = serverAddress( channel ); - AtomicInteger count = addressToActiveChannelCount.get( address ); - if ( count == null ) - { - throw new IllegalStateException( "No count exist for address '" + address + "'" ); - } - count.decrementAndGet(); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java index 264e6286bd..474ec9e56e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java @@ -36,6 +36,8 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.ChannelConnector; import org.neo4j.driver.internal.async.NettyConnection; +import org.neo4j.driver.internal.metrics.ListenerEvent; +import org.neo4j.driver.internal.metrics.MetricsListener; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; @@ -48,29 +50,31 @@ public class ConnectionPoolImpl implements ConnectionPool { private final ChannelConnector connector; private final Bootstrap bootstrap; - private final ActiveChannelTracker activeChannelTracker; + private final NettyChannelTracker nettyChannelTracker; private final NettyChannelHealthChecker channelHealthChecker; private final PoolSettings settings; private final Clock clock; private final Logger log; + private MetricsListener metricsListener; private final ConcurrentMap pools = new ConcurrentHashMap<>(); private final AtomicBoolean closed = new AtomicBoolean(); public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, - Logging logging, Clock clock ) + MetricsListener metricsListener, Logging logging, Clock clock ) { - this( connector, bootstrap, new ActiveChannelTracker( logging ), settings, logging, clock ); + this( connector, bootstrap, new NettyChannelTracker( metricsListener, logging ), settings, metricsListener, logging, clock ); } - ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, ActiveChannelTracker activeChannelTracker, - PoolSettings settings, Logging logging, Clock clock ) + ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker, + PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock ) { this.connector = connector; this.bootstrap = bootstrap; - this.activeChannelTracker = activeChannelTracker; + this.nettyChannelTracker = nettyChannelTracker; this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging ); this.settings = settings; + this.metricsListener = metricsListener; this.clock = clock; this.log = logging.getLog( ConnectionPool.class.getSimpleName() ); } @@ -82,13 +86,23 @@ public CompletionStage acquire( BoltServerAddress address ) assertNotClosed(); ChannelPool pool = getOrCreatePool( address ); + + ListenerEvent acquireEvent = metricsListener.createListenerEvent(); + metricsListener.beforeAcquiringOrCreating( address, acquireEvent ); Future connectionFuture = pool.acquire(); return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) -> { - processAcquisitionError( error ); - assertNotClosed( address, channel, pool ); - return new NettyConnection( channel, pool, clock ); + try + { + processAcquisitionError( error ); + assertNotClosed( address, channel, pool ); + return new NettyConnection( channel, pool, clock, metricsListener ); + } + finally + { + metricsListener.afterAcquiringOrCreating( address, acquireEvent ); + } } ); } @@ -99,7 +113,7 @@ public void retainAll( Set addressesToRetain ) { if ( !addressesToRetain.contains( address ) ) { - int activeChannels = activeChannelTracker.activeChannelCount( address ); + int activeChannels = nettyChannelTracker.inUseChannelCount( address ); if ( activeChannels == 0 ) { // address is not present in updated routing table and has no active connections @@ -118,9 +132,15 @@ public void retainAll( Set addressesToRetain ) } @Override - public int activeConnections( BoltServerAddress address ) + public int inUseConnections( BoltServerAddress address ) + { + return nettyChannelTracker.inUseChannelCount( address ); + } + + @Override + public int idleConnections( BoltServerAddress address ) { - return activeChannelTracker.activeChannelCount( address ); + return nettyChannelTracker.idleChannelCount( address ); } @Override @@ -150,26 +170,38 @@ public CompletionStage close() .thenApply( ignore -> null ); } + @Override + public boolean isOpen() + { + return !closed.get(); + } + private ChannelPool getOrCreatePool( BoltServerAddress address ) { ChannelPool pool = pools.get( address ); - if ( pool == null ) + if ( pool != null ) { - pool = newPool( address ); + return pool; + } - if ( pools.putIfAbsent( address, pool ) != null ) + synchronized ( this ) + { + pool = pools.get( address ); + if ( pool != null ) { - // We lost a race to create the pool, dispose of the one we created, and recurse - pool.close(); - return getOrCreatePool( address ); + return pool; } + + metricsListener.addMetrics( address, this ); + pool = newPool( address ); + pools.put( address, pool ); } return pool; } ChannelPool newPool( BoltServerAddress address ) { - return new NettyChannelPool( address, connector, bootstrap, activeChannelTracker, channelHealthChecker, + return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() ); } @@ -217,4 +249,10 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Channe assertNotClosed(); } } + + @Override + public String toString() + { + return "ConnectionPoolImpl{" + "pools=" + pools + '}'; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java index 893f189f7d..e350ae0388 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java @@ -19,13 +19,14 @@ package org.neo4j.driver.internal.async.pool; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.pool.ChannelHealthChecker; -import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.FixedChannelPool; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.ChannelConnector; +import org.neo4j.driver.internal.metrics.ListenerEvent; import static java.util.Objects.requireNonNull; @@ -42,9 +43,10 @@ public class NettyChannelPool extends FixedChannelPool private final BoltServerAddress address; private final ChannelConnector connector; + private final NettyChannelTracker handler; public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, - ChannelPoolHandler handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis, + NettyChannelTracker handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections ) { super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections, @@ -52,19 +54,28 @@ public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, this.address = requireNonNull( address ); this.connector = requireNonNull( connector ); + this.handler = requireNonNull( handler ); } @Override protected ChannelFuture connectChannel( Bootstrap bootstrap ) { + ListenerEvent creatingEvent = handler.beforeChannelCreating( address ); ChannelFuture channelFuture = connector.connect( address, bootstrap ); channelFuture.addListener( future -> { if ( future.isSuccess() ) { // notify pool handler about a successful connection - handler().channelCreated( channelFuture.channel() ); + Channel channel = channelFuture.channel(); + handler.channelCreated( channel ); + channel.closeFuture().addListener( closeFuture -> handler.channelClosed( channel ) ); } + else + { + handler.channelFailedToCreate( address ); + } + handler.afterChannelCreating( address, creatingEvent ); } ); return channelFuture; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java new file mode 100644 index 0000000000..11e98cbad0 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.pool; + +import io.netty.channel.Channel; +import io.netty.channel.pool.ChannelPoolHandler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.metrics.ListenerEvent; +import org.neo4j.driver.internal.metrics.MetricsListener; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; + +import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress; + +public class NettyChannelTracker implements ChannelPoolHandler +{ + private final Map addressToInUseChannelCount = new ConcurrentHashMap<>(); + private final Map addressToIdleChannelCount = new ConcurrentHashMap<>(); + private final Logger log; + private final MetricsListener metricsListener; + + public NettyChannelTracker( MetricsListener metricsListener, Logging logging ) + { + this.metricsListener = metricsListener; + this.log = logging.getLog( getClass().getSimpleName() ); + } + + @Override + public void channelReleased( Channel channel ) + { + log.debug( "Channel %s released back to the pool", channel ); + decrementInUse( channel ); + incrementIdle( channel ); + } + + @Override + public void channelAcquired( Channel channel ) + { + log.debug( "Channel %s acquired from the pool", channel ); + incrementInUse( channel ); + decrementIdle( channel ); + } + + @Override + public void channelCreated( Channel channel ) + { + log.debug( "Channel %s created", channel ); + incrementInUse( channel ); + metricsListener.afterCreated( serverAddress( channel ) ); + } + + public void channelFailedToCreate( BoltServerAddress address ) + { + metricsListener.afterFailedToCreate( address ); + } + + public ListenerEvent beforeChannelCreating( BoltServerAddress address ) + { + ListenerEvent creatingEvent = metricsListener.createListenerEvent(); + metricsListener.beforeCreating( address, creatingEvent ); + return creatingEvent; + } + + public void afterChannelCreating( BoltServerAddress address, ListenerEvent creatingEvent ) + { + metricsListener.afterCreating( address, creatingEvent ); + } + + public void channelClosed( Channel channel ) + { + decrementIdle( channel ); + metricsListener.afterClosed( serverAddress( channel ) ); + } + + public int inUseChannelCount( BoltServerAddress address ) + { + AtomicInteger count = addressToInUseChannelCount.get( address ); + return count == null ? 0 : count.get(); + } + + public int idleChannelCount( BoltServerAddress address ) + { + AtomicInteger count = addressToIdleChannelCount.get( address ); + return count == null ? 0 : count.get(); + } + + private void incrementInUse( Channel channel ) + { + increment( channel, addressToInUseChannelCount ); + } + + private void decrementInUse( Channel channel ) + { + decrement( channel, addressToInUseChannelCount ); + } + + private void incrementIdle( Channel channel ) + { + increment( channel, addressToIdleChannelCount ); + } + + private void decrementIdle( Channel channel ) + { + decrement( channel, addressToIdleChannelCount ); + } + + private void increment( Channel channel, Map countMap ) + { + BoltServerAddress address = serverAddress( channel ); + AtomicInteger count = countMap.computeIfAbsent( address, k -> new AtomicInteger() ); + count.incrementAndGet(); + } + + private void decrement( Channel channel, Map countMap ) + { + BoltServerAddress address = serverAddress( channel ); + AtomicInteger count = countMap.get( address ); + if ( count == null ) + { + throw new IllegalStateException( "No count exist for address '" + address + "'" ); + } + count.decrementAndGet(); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java index ff7df31fec..c85ff26254 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java @@ -77,7 +77,7 @@ private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArray do { BoltServerAddress address = addresses[index]; - int activeConnections = connectionPool.activeConnections( address ); + int activeConnections = connectionPool.inUseConnections( address ); if ( activeConnections < leastActiveConnections ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java new file mode 100644 index 0000000000..a8b55a2787 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + + +public interface ConnectionMetricsListener +{ + void beforeCreating( ListenerEvent listenerEvent ); + + void afterCreating( ListenerEvent listenerEvent ); + + void acquiredOrCreated( ListenerEvent listenerEvent ); + + void released(ListenerEvent listenerEvent); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java new file mode 100644 index 0000000000..93f414c7f9 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +public interface ConnectionPoolMetricsListener +{ + void beforeCreating(); + + void afterCreated(); + + void afterFailedToCreate(); + + void afterClosed(); + + void beforeAcquiringOrCreating( ListenerEvent listenerEvent ); + + void afterAcquiringOrCreating( ListenerEvent listenerEvent ); +} + diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/HistogramSnapshot.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/HistogramSnapshot.java new file mode 100644 index 0000000000..8959153abe --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/HistogramSnapshot.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import org.neo4j.driver.internal.metrics.spi.Histogram; + +public class HistogramSnapshot implements Histogram +{ + private final Histogram copy; + private final Histogram origin; + + public HistogramSnapshot( Histogram copy, Histogram origin ) + { + this.copy = copy; + this.origin = origin; + } + + @Override + public long min() + { + return copy.min(); + } + + @Override + public long max() + { + return copy.max(); + } + + @Override + public double mean() + { + return copy.mean(); + } + + @Override + public double stdDeviation() + { + return copy.stdDeviation(); + } + + @Override + public long totalCount() + { + return copy.totalCount(); + } + + @Override + public long valueAtPercentile( double percentile ) + { + return copy.valueAtPercentile( percentile ); + } + + @Override + public void reset() + { + origin.reset(); + } + + @Override + public String toString() + { + return copy.toString(); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java new file mode 100644 index 0000000000..959bfe8361 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import java.util.Collections; +import java.util.Map; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; +import org.neo4j.driver.internal.metrics.spi.ConnectionMetrics; +import org.neo4j.driver.internal.metrics.spi.ConnectionPoolMetrics; +import org.neo4j.driver.internal.metrics.spi.Metrics; + +public abstract class InternalAbstractMetrics implements Metrics, MetricsListener +{ + public static final InternalAbstractMetrics DEV_NULL_METRICS = new InternalAbstractMetrics() + { + + @Override + public void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent ) + { + } + + @Override + public void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent ) + { + } + + @Override + public void afterCreated( BoltServerAddress serverAddress ) + { + } + + @Override + public void afterFailedToCreate( BoltServerAddress serverAddress ) + { + } + + @Override + public void afterClosed( BoltServerAddress serverAddress ) + { + } + + @Override + public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent ) + { + } + + @Override + public void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent ) + { + } + + @Override + public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent ) + { + } + + @Override + public void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent ) + { + } + + @Override + public ListenerEvent createListenerEvent() + { + return null; + } + + + @Override + public void addMetrics( BoltServerAddress address, ConnectionPoolImpl connectionPool ) + { + + } + + @Override + public Map connectionPoolMetrics() + { + return Collections.emptyMap(); + } + + @Override + public Map connectionMetrics() + { + return Collections.emptyMap(); + } + + @Override + public String toString() + { + return "Driver metrics not available while driver metrics is not enabled."; + } + }; +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java new file mode 100644 index 0000000000..6db36f3b71 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import java.time.Duration; +import java.util.Objects; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.metrics.spi.ConnectionMetrics; +import org.neo4j.driver.internal.metrics.spi.Histogram; + + +public class InternalConnectionMetrics implements ConnectionMetrics, ConnectionMetricsListener +{ + private final InternalHistogram connHistogram; + private final InternalHistogram inUseHistogram; + private final BoltServerAddress serverAddress; + + public InternalConnectionMetrics( BoltServerAddress serverAddress, int connectionTimeoutMillis ) + { + Objects.requireNonNull( serverAddress ); + this.serverAddress = serverAddress; + connHistogram = new InternalHistogram( Duration.ofMillis( connectionTimeoutMillis ).toNanos() ); + inUseHistogram = new InternalHistogram(); + } + + @Override + public String uniqueName() + { + return serverAddress.toString(); + } + + @Override + public Histogram connectionTimeHistogram() + { + return connHistogram.snapshot(); + } + + @Override + public Histogram inUseTimeHistogram() + { + return inUseHistogram.snapshot(); + } + + @Override + public void beforeCreating( ListenerEvent connEvent ) + { + // creating a conn + connEvent.start(); + } + + @Override + public void afterCreating( ListenerEvent connEvent ) + { + // finished conn creation + long elapsed = connEvent.elapsed(); + connHistogram.recordValue( elapsed ); + } + + @Override + public void acquiredOrCreated( ListenerEvent inUseEvent ) + { + // created + acquired = inUse + inUseEvent.start(); + } + + @Override + public void released(ListenerEvent inUseEvent) + { + // idle + long elapsed = inUseEvent.elapsed(); + inUseHistogram.recordValue( elapsed ); + } + + @Override + public String toString() + { + return String.format( "connectionTimeHistogram=%s, inUseTimeHistogram=%s", connectionTimeHistogram(), inUseTimeHistogram() ); + } + +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java new file mode 100644 index 0000000000..47130fef65 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal.metrics; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.metrics.spi.ConnectionPoolMetrics; +import org.neo4j.driver.internal.metrics.spi.Histogram; +import org.neo4j.driver.internal.metrics.spi.PoolStatus; +import org.neo4j.driver.internal.spi.ConnectionPool; + +import static java.lang.String.format; +import static org.neo4j.driver.internal.metrics.InternalMetrics.serverAddressToUniqueName; + +public class InternalConnectionPoolMetrics implements ConnectionPoolMetrics, ConnectionPoolMetricsListener +{ + private final BoltServerAddress address; + private final ConnectionPool pool; + + private AtomicLong created = new AtomicLong(); + private AtomicLong closed = new AtomicLong(); + private AtomicInteger creating = new AtomicInteger(); + private AtomicLong failedToCreate = new AtomicLong(); + + private InternalHistogram acquisitionTimeHistogram; + + public InternalConnectionPoolMetrics( BoltServerAddress address, ConnectionPool pool, long connAcquisitionTimeoutMs ) + { + Objects.requireNonNull( address ); + Objects.requireNonNull( pool ); + + this.address = address; + this.pool = pool; + this.acquisitionTimeHistogram = new InternalHistogram( Duration.ofMillis( connAcquisitionTimeoutMs ).toNanos() ); + } + + @Override + public void beforeCreating() + { + creating.incrementAndGet(); + } + + @Override + public void afterFailedToCreate() + { + failedToCreate.incrementAndGet(); + creating.decrementAndGet(); + } + + @Override + public void afterCreated() + { + created.incrementAndGet(); + creating.decrementAndGet(); + } + + @Override + public void afterClosed() + { + closed.incrementAndGet(); + } + + @Override + public void beforeAcquiringOrCreating( ListenerEvent listenerEvent ) + { + listenerEvent.start(); + } + + @Override + public void afterAcquiringOrCreating( ListenerEvent listenerEvent ) + { + long elapsed = listenerEvent.elapsed(); + acquisitionTimeHistogram.recordValue( elapsed ); + } + + @Override + public String uniqueName() + { + return serverAddressToUniqueName( address ); + } + + @Override + public PoolStatus poolStatus() + { + if ( pool.isOpen() ) + { + return PoolStatus.Open; + } + else + { + return PoolStatus.Closed; + } + } + + @Override + public int inUse() + { + return pool.inUseConnections( address ); + } + + @Override + public int idle() + { + return pool.idleConnections( address ); + } + + @Override + public int creating() + { + return creating.get(); + } + + @Override + public long created() + { + return created.get(); + } + + @Override + public long failedToCreate() + { + return failedToCreate.get(); + } + + @Override + public long closed() + { + return closed.get(); + } + + @Override + public Histogram acquisitionTimeHistogram() + { + return this.acquisitionTimeHistogram.snapshot(); + } + + @Override + public String toString() + { + return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]", created(), + closed(), creating(), failedToCreate(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalHistogram.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalHistogram.java new file mode 100644 index 0000000000..d0a89e995b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalHistogram.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import org.HdrHistogram.AbstractHistogram; +import org.HdrHistogram.ConcurrentHistogram; + +import java.time.Duration; + +import org.neo4j.driver.internal.metrics.spi.Histogram; + +import static java.lang.String.format; + +public class InternalHistogram implements Histogram +{ + private static final long DEFAULT_HIGHEST_TRACKABLE_NS = Duration.ofMinutes( 10 ).toNanos(); + private static final int DEFAULT_NUMBER_OF_SIGNIFICANT_VALUE_DIGITS = 3; + + private final AbstractHistogram delegate; + + public InternalHistogram() + { + this( DEFAULT_HIGHEST_TRACKABLE_NS ); + } + + public InternalHistogram( long highestTrackableValueNS ) + { + this.delegate = createHdrHistogram( highestTrackableValueNS ); + } + + public InternalHistogram( AbstractHistogram histogram ) + { + this.delegate = histogram; + } + + public void recordValue( long value ) + { + long newValue = truncateValue( value, delegate ); + this.delegate.recordValue( newValue ); + } + + @Override + public long min() + { + return delegate.getMinValue(); + } + + @Override + public long max() + { + return delegate.getMaxValue(); + } + + @Override + public double mean() + { + return delegate.getMean(); + } + + @Override + public double stdDeviation() + { + return delegate.getStdDeviation(); + } + + @Override + public long totalCount() + { + return delegate.getTotalCount(); + } + + @Override + public long valueAtPercentile( double percentile ) + { + return delegate.getValueAtPercentile( percentile ); + } + + @Override + public void reset() + { + delegate.reset(); + } + + public static ConcurrentHistogram createHdrHistogram( long highestTrackableValue ) + { + return new ConcurrentHistogram( highestTrackableValue, DEFAULT_NUMBER_OF_SIGNIFICANT_VALUE_DIGITS ); + } + + private static long truncateValue( long value, AbstractHistogram histogram ) + { + if ( value > histogram.getHighestTrackableValue() ) + { + return histogram.getHighestTrackableValue(); + } + else + { + return value; + } + } + + public Histogram snapshot() + { + return new HistogramSnapshot( new InternalHistogram( this.delegate.copy() ), this ); + } + + @Override + public String toString() + { + return format("[min=%sns, max=%sns, mean=%sns, stdDeviation=%s, totalCount=%s]", + min(), max(), mean(), stdDeviation(), totalCount()); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java new file mode 100644 index 0000000000..23b83587b7 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; +import org.neo4j.driver.internal.metrics.spi.ConnectionMetrics; +import org.neo4j.driver.internal.metrics.spi.ConnectionPoolMetrics; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.exceptions.ClientException; + +import static java.lang.String.format; + +public class InternalMetrics extends InternalAbstractMetrics +{ + private final Map connectionPoolMetrics; + private final Map connectionMetrics; + private final Config config; + + public InternalMetrics( Config config ) + { + Objects.requireNonNull( config ); + this.config = config; + this.connectionPoolMetrics = new ConcurrentHashMap<>(); + this.connectionMetrics = new ConcurrentHashMap<>(); + } + + @Override + public void addMetrics( BoltServerAddress serverAddress, ConnectionPoolImpl pool ) + { + addPoolMetrics( serverAddress, pool ); + addConnectionMetrics( serverAddress ); + } + + @Override + public void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent ) + { + poolMetrics( serverAddress ).beforeCreating(); + connectionMetrics( serverAddress ).beforeCreating( creatingEvent ); + } + + @Override + public void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent ) + { + connectionMetrics( serverAddress ).afterCreating( creatingEvent ); + } + + @Override + public void afterCreated( BoltServerAddress serverAddress ) + { + poolMetrics( serverAddress ).afterCreated(); + } + + @Override + public void afterFailedToCreate( BoltServerAddress serverAddress ) + { + poolMetrics( serverAddress ).afterFailedToCreate(); + } + + @Override + public void afterClosed( BoltServerAddress serverAddress ) + { + poolMetrics( serverAddress ).afterClosed(); + } + + @Override + public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent listenerEvent ) + { + poolMetrics( serverAddress ).beforeAcquiringOrCreating( listenerEvent ); + } + + @Override + public void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent listenerEvent ) + { + poolMetrics( serverAddress ).afterAcquiringOrCreating( listenerEvent ); + } + + @Override + public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent ) + { + connectionMetrics( serverAddress ).acquiredOrCreated( inUseEvent ); + } + + @Override + public void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent ) + { + connectionMetrics( serverAddress ).released( inUseEvent ); + } + + @Override + public ListenerEvent createListenerEvent() + { + return new NanoTimeBasedListenerEvent(); + } + + @Override + public Map connectionPoolMetrics() + { + return this.connectionPoolMetrics; + } + + @Override + public Map connectionMetrics() + { + return this.connectionMetrics; + } + + @Override + public String toString() + { + return format( "PoolMetrics=%s, ConnMetrics=%s", connectionPoolMetrics, connectionMetrics ); + } + + static String serverAddressToUniqueName( BoltServerAddress serverAddress ) + { + return serverAddress.toString(); + } + + private ConnectionPoolMetricsListener poolMetrics( BoltServerAddress serverAddress ) + { + InternalConnectionPoolMetrics poolMetrics = + (InternalConnectionPoolMetrics) this.connectionPoolMetrics.get( serverAddressToUniqueName( serverAddress ) ); + if ( poolMetrics == null ) + { + throw new ClientException( format( "Failed to find pool metrics for server `%s` in %s", serverAddress, this.connectionPoolMetrics ) ); + } + return poolMetrics; + } + + private ConnectionMetricsListener connectionMetrics( BoltServerAddress serverAddress ) + { + InternalConnectionMetrics connMetrics = (InternalConnectionMetrics) this.connectionMetrics.get( serverAddressToUniqueName( serverAddress ) ); + if ( connMetrics == null ) + { + throw new ClientException( format( "Failed to find connection metrics for server `%s` in %s", serverAddress, this.connectionMetrics ) ); + } + return connMetrics; + } + + private void addPoolMetrics( BoltServerAddress serverAddress, ConnectionPool pool ) + { + this.connectionPoolMetrics.put( serverAddressToUniqueName( serverAddress ), + new InternalConnectionPoolMetrics( serverAddress, pool, config.connectionAcquisitionTimeoutMillis() ) ); + } + + private void addConnectionMetrics( BoltServerAddress serverAddress ) + { + this.connectionMetrics.put( serverAddressToUniqueName( serverAddress ), + new InternalConnectionMetrics( serverAddress, config.connectionTimeoutMillis() ) ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ListenerEvent.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ListenerEvent.java new file mode 100644 index 0000000000..6eece7d235 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/ListenerEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + + +public interface ListenerEvent +{ + void start(); + long elapsed(); +} + diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java new file mode 100644 index 0000000000..41d337b1ac --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.async.NettyConnection; +import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; + +public interface MetricsListener +{ + /** + * Before creating a netty channel + * @param serverAddress the server the netty channel binds to. + * @param creatingEvent a connection listener event registered when a connection is creating. + */ + void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent ); + + /** + * After creating a netty channel regardless succeeded or failed. + * @param serverAddress the server the netty channel binds to. + * @param creatingEvent a connection listener event registered when a connection is creating. + */ + void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent ); + + /** + * After a netty channel is created successfully + * This method will not invoke {@link this#afterCreating(BoltServerAddress, ListenerEvent)} + * @param serverAddress the server the netty channel binds to + */ + void afterCreated( BoltServerAddress serverAddress ); + + /** + * After a netty channel is created with failure + * This method will not invoke {@link this#afterCreating(BoltServerAddress, ListenerEvent)} + * @param serverAddress the server the netty channel binds to + */ + void afterFailedToCreate( BoltServerAddress serverAddress ); + + /** + * After a netty channel is closed successfully + * @param serverAddress the server the netty channel binds to + */ + void afterClosed( BoltServerAddress serverAddress ); + + /** + * Before acquiring or creating a new netty channel from pool + * @param serverAddress the server the netty channel binds to + * @param acquireEvent a pool listener event registered in pool for this acquire event + */ + void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent ); + + /** + * After acquiring or creating a new netty channel from pool regardless succeeded or failed + * @param serverAddress the server the netty channel binds to + * @param acquireEvent a pool listener event registered in pool for this acquire event + */ + void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent ); + + /** + * After acquiring or creating a new netty channel from pool successfully. + * This method will not invoke {@link this#afterAcquiringOrCreating(BoltServerAddress, ListenerEvent)} + * @param serverAddress the server the netty channel binds to + * @param inUseEvent a connection listener registered with a {@link NettyConnection} when created + */ + void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent ); + + /** + * After releasing a netty channel back to pool successfully + * @param serverAddress the server the netty channel binds to + * @param inUseEvent a connection listener registered with a {@link NettyConnection} when destroyed + */ + void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent ); + + ListenerEvent createListenerEvent(); + + void addMetrics( BoltServerAddress address, ConnectionPoolImpl connectionPool ); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/NanoTimeBasedListenerEvent.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/NanoTimeBasedListenerEvent.java new file mode 100644 index 0000000000..a2c315c4fd --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/NanoTimeBasedListenerEvent.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal.metrics; + +public class NanoTimeBasedListenerEvent implements ListenerEvent +{ + private long startNanoTime; + + @Override + public void start() + { + startNanoTime = System.nanoTime(); + } + + @Override + public long elapsed() + { + return System.nanoTime() - startNanoTime; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionMetrics.java new file mode 100644 index 0000000000..a0f5f773bb --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionMetrics.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics.spi; + +public interface ConnectionMetrics +{ + /** + * The unique name of this connection metrics among all connection metrics + * @return An unique name of this connection metrics among all connection metrics + */ + String uniqueName(); + + /** + * The connection time histogram describes how long it takes to establish a connection + * @return The connection time histogram + */ + Histogram connectionTimeHistogram (); + + /** + * The in-use time histogram records how long each connection is borrowed out of the pool + * @return The in-use time histogram + */ + Histogram inUseTimeHistogram(); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java new file mode 100644 index 0000000000..40a8afddab --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics.spi; + +public interface ConnectionPoolMetrics +{ + /** + * An unique name that identifies this connection pool metrics among all others + * @return An unique name + */ + String uniqueName(); + + /** + * The status of the pool + * @return The status of the pool in a string + */ + PoolStatus poolStatus(); + + /** + * The amount of connections that is in-use (borrowed out of the pool). + * The number is changing up and down from time to time. + * @return The amount of connections that is in-use + */ + int inUse(); + + /** + * The amount of connections that is idle (buffered inside the pool). + * The number is changing up and down from time to time. + * @return The amount of connections that is idle. + */ + int idle(); + + /** + * The amount of connections that is going to be created. + * The amount is increased by one when the pool noticed a request to create a new connection. + * The amount is decreased by one when the pool noticed a new connection is created regardless successfully or not. + * The number is changing up and down from time to time. + * @return The amount of connection that is creating. + */ + int creating(); + + /** + * An increasing-only number to record how many connections have been created by this pool successfully. + * @return The amount of connections have ever been created by this pool. + */ + long created(); + + /** + * An increasing-only number to record how many connections have been failed to create. + * @return The amount of connections have been failed to create by this pool. + */ + long failedToCreate(); + + /** + * An increasing-only number to record how many connections have been closed by this pool. + * @return The amount of connections have been closed by this pool. + */ + long closed(); + + /** + * An acquisition time histogram records how long it takes to acquire an connection from this pool. + * The connection acquired from the pool could either be a connection idling inside the pool or a connection created by the pool. + * @return The acquisition time histogram. + */ + Histogram acquisitionTimeHistogram(); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Histogram.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Histogram.java new file mode 100644 index 0000000000..a05948d48c --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Histogram.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics.spi; + +public interface Histogram +{ + /** + * The minimum value recorded in this histogram. + * @return The minimum value recorded in this histogram. + */ + long min(); + + /** + * The maximum value recorded in this histogram. + * @return The maximum value recorded in this histogram. + */ + long max(); + + /** + * The mean value of this histogram. + * @return The mean value. + */ + double mean(); + + /** + * The standard deviation of this histogram. + * @return The standard deviation. + */ + double stdDeviation(); + + /** + * The total count of the values recorded in this histogram. + * @return The total number of values. + */ + long totalCount(); + + /** + * Returns the value at the given percentile. + * @param percentile The interested percentile such as 50 (mean value), 80 etc. + * @return The value at the given percentile. + */ + long valueAtPercentile(double percentile); + + /** + * Reset cleans all the values recorded in this histogram. + */ + void reset(); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Metrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Metrics.java new file mode 100644 index 0000000000..ca04a88d62 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Metrics.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics.spi; + +import java.util.Map; + +public interface Metrics +{ + // TODO Once this interface become public, find a better way to enable metrics and detect metrics availability. + String DRIVER_METRICS_ENABLED_KEY = "driver.metrics.enabled"; + static boolean isMetricsEnabled() + { + return Boolean.getBoolean( DRIVER_METRICS_ENABLED_KEY ); + } + + /** + * A map of connection pool metrics. + * The {@link ConnectionPoolMetrics#uniqueName()} are used as the keys of the map. + * @return The connection pool metrics. + */ + Map connectionPoolMetrics(); + + /*** + * A map of connection metrics. + * The {@link ConnectionMetrics#uniqueName()} are used as the keys of the map. + * @return The connection metrics. + */ + Map connectionMetrics(); + +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/PoolStatus.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/PoolStatus.java new file mode 100644 index 0000000000..6fe7099805 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/PoolStatus.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics.spi; + +public enum PoolStatus +{ + Open, Closed +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java index ca3de53f41..73c7f6d86b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java @@ -29,7 +29,11 @@ public interface ConnectionPool void retainAll( Set addressesToRetain ); - int activeConnections( BoltServerAddress address ); + int inUseConnections( BoltServerAddress address ); + + int idleConnections( BoltServerAddress address ); CompletionStage close(); + + boolean isOpen(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index 5235dc0618..0257ee2cce 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -33,6 +33,9 @@ import org.neo4j.driver.internal.async.BootstrapFactory; import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer; +import org.neo4j.driver.internal.metrics.InternalMetrics; +import org.neo4j.driver.internal.metrics.MetricsListener; +import org.neo4j.driver.internal.metrics.spi.Metrics; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.security.SecurityPlan; @@ -47,6 +50,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -56,6 +60,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.AccessMode.READ; @@ -175,6 +180,30 @@ public void shouldThrowWhenUnableToVerifyConnectivity() } } + @Test + public void shouldNotCreateDriverMetrics() throws Throwable + { + // Given + Config config = mock( Config.class ); + // When + MetricsListener handler = DriverFactory.createDriverMetrics( config ); + // Then + assertThat( handler, is( DEV_NULL_METRICS ) ); + } + + @Test + public void shouldCreateDriverMetricsIfMonitoringEnabled() throws Throwable + { + // Given + Config config = mock( Config.class ); + System.setProperty( "driver.metrics.enabled", "True" ); + // When + MetricsListener handler = DriverFactory.createDriverMetrics( config ); + System.setProperty( "driver.metrics.enabled", "False" ); + // Then + assertThat( handler instanceof InternalMetrics, is( true ) ); + } + private Driver createDriver( DriverFactory driverFactory ) { return createDriver( driverFactory, defaultConfig() ); @@ -206,22 +235,20 @@ private static class ThrowingDriverFactory extends DriverFactory } @Override - protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config ) + protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Config config ) { throw new UnsupportedOperationException( "Can't create direct driver" ); } @Override - protected InternalDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic, - EventExecutorGroup eventExecutorGroup ) + protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool, + EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, Metrics metrics, Config config ) { throw new UnsupportedOperationException( "Can't create routing driver" ); } @Override - protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Bootstrap bootstrap, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config ) { return connectionPool; } @@ -232,7 +259,7 @@ private static class SessionFactoryCapturingDriverFactory extends DriverFactory SessionFactory capturedSessionFactory; @Override - protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config ) + protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Config config ) { InternalDriver driver = mock( InternalDriver.class ); when( driver.verifyConnectivity() ).thenReturn( completedWithNull() ); @@ -256,8 +283,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv } @Override - protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Bootstrap bootstrap, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config ) { return connectionPoolMock(); } @@ -279,8 +305,7 @@ protected Bootstrap createBootstrap() } @Override - protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Bootstrap bootstrap, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config ) { return connectionPoolMock(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java index de6ce07178..9f39189cb7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -72,7 +73,7 @@ public void shouldVerifyConnectivity() private static InternalDriver newDriver( SessionFactory sessionFactory ) { - return new InternalDriver( SecurityPlan.insecure(), sessionFactory, DEV_NULL_LOGGING ); + return new InternalDriver( SecurityPlan.insecure(), sessionFactory, DEV_NULL_METRICS, DEV_NULL_LOGGING ); } private static SessionFactory sessionFactoryMock() diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index b96f39b6e8..88955e8cd0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -60,6 +60,7 @@ import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.messaging.ResetMessage.RESET; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; @@ -497,7 +498,7 @@ private static NettyConnection newConnection( Channel channel ) private static NettyConnection newConnection( Channel channel, ChannelPool pool ) { - return new NettyConnection( channel, pool, new FakeClock() ); + return new NettyConnection( channel, pool, new FakeClock(), DEV_NULL_METRICS ); } private static void assertConnectionReleasedError( IllegalStateException e ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java deleted file mode 100644 index 9c2c747c95..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.async.pool; - -import io.netty.channel.Channel; -import io.netty.channel.embedded.EmbeddedChannel; -import org.junit.Test; - -import org.neo4j.driver.internal.BoltServerAddress; - -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress; -import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; - -public class ActiveChannelTrackerTest -{ - private final BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT; - private final ActiveChannelTracker tracker = new ActiveChannelTracker( DEV_NULL_LOGGING ); - - @Test - public void shouldIncrementCountWhenChannelCreated() - { - Channel channel = newChannel(); - assertEquals( 0, tracker.activeChannelCount( address ) ); - - tracker.channelCreated( channel ); - assertEquals( 1, tracker.activeChannelCount( address ) ); - } - - @Test - public void shouldIncrementCountForAddress() - { - Channel channel1 = newChannel(); - Channel channel2 = newChannel(); - Channel channel3 = newChannel(); - - assertEquals( 0, tracker.activeChannelCount( address ) ); - tracker.channelAcquired( channel1 ); - assertEquals( 1, tracker.activeChannelCount( address ) ); - tracker.channelAcquired( channel2 ); - assertEquals( 2, tracker.activeChannelCount( address ) ); - tracker.channelAcquired( channel3 ); - assertEquals( 3, tracker.activeChannelCount( address ) ); - } - - @Test - public void shouldDecrementCountForAddress() - { - Channel channel1 = newChannel(); - Channel channel2 = newChannel(); - Channel channel3 = newChannel(); - - tracker.channelAcquired( channel1 ); - tracker.channelAcquired( channel2 ); - tracker.channelAcquired( channel3 ); - assertEquals( 3, tracker.activeChannelCount( address ) ); - - tracker.channelReleased( channel1 ); - assertEquals( 2, tracker.activeChannelCount( address ) ); - tracker.channelReleased( channel2 ); - assertEquals( 1, tracker.activeChannelCount( address ) ); - tracker.channelReleased( channel3 ); - assertEquals( 0, tracker.activeChannelCount( address ) ); - } - - @Test - public void shouldThrowWhenDecrementingForUnknownAddress() - { - Channel channel = newChannel(); - - try - { - tracker.channelReleased( channel ); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( IllegalStateException.class ) ); - } - } - - @Test - public void shouldReturnZeroActiveCountForUnknownAddress() - { - assertEquals( 0, tracker.activeChannelCount( address ) ); - } - - private Channel newChannel() - { - EmbeddedChannel channel = new EmbeddedChannel(); - setServerAddress( channel, address ); - return channel; - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java index f6a4c496e5..3f4f9dde66 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java @@ -59,6 +59,7 @@ import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.v1.util.TestUtil.await; public class ConnectionPoolImplTest @@ -146,19 +147,19 @@ public void shouldNotCloseWhenClosed() @Test public void shouldDoNothingWhenRetainOnEmptyPool() { - ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class ); - TestConnectionPool pool = new TestConnectionPool( activeChannelTracker ); + NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class ); + TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker ); pool.retainAll( singleton( LOCAL_DEFAULT ) ); - verifyZeroInteractions( activeChannelTracker ); + verifyZeroInteractions( nettyChannelTracker ); } @Test public void shouldRetainSpecifiedAddresses() { - ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class ); - TestConnectionPool pool = new TestConnectionPool( activeChannelTracker ); + NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class ); + TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker ); pool.acquire( ADDRESS_1 ); pool.acquire( ADDRESS_2 ); @@ -174,16 +175,16 @@ public void shouldRetainSpecifiedAddresses() @Test public void shouldClosePoolsWhenRetaining() { - ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class ); - TestConnectionPool pool = new TestConnectionPool( activeChannelTracker ); + NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class ); + TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker ); pool.acquire( ADDRESS_1 ); pool.acquire( ADDRESS_2 ); pool.acquire( ADDRESS_3 ); - when( activeChannelTracker.activeChannelCount( ADDRESS_1 ) ).thenReturn( 2 ); - when( activeChannelTracker.activeChannelCount( ADDRESS_2 ) ).thenReturn( 0 ); - when( activeChannelTracker.activeChannelCount( ADDRESS_3 ) ).thenReturn( 3 ); + when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 2 ); + when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 0 ); + when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 ); pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) ); verify( pool.getPool( ADDRESS_1 ), never() ).close(); @@ -194,16 +195,16 @@ public void shouldClosePoolsWhenRetaining() @Test public void shouldNotClosePoolsWithActiveConnectionsWhenRetaining() { - ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class ); - TestConnectionPool pool = new TestConnectionPool( activeChannelTracker ); + NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class ); + TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker ); pool.acquire( ADDRESS_1 ); pool.acquire( ADDRESS_2 ); pool.acquire( ADDRESS_3 ); - when( activeChannelTracker.activeChannelCount( ADDRESS_1 ) ).thenReturn( 1 ); - when( activeChannelTracker.activeChannelCount( ADDRESS_2 ) ).thenReturn( 42 ); - when( activeChannelTracker.activeChannelCount( ADDRESS_3 ) ).thenReturn( 0 ); + when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 1 ); + when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 42 ); + when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 ); pool.retainAll( singleton( ADDRESS_2 ) ); verify( pool.getPool( ADDRESS_1 ), never() ).close(); @@ -219,7 +220,7 @@ private ConnectionPoolImpl newPool() throws Exception DEV_NULL_LOGGING, clock ); PoolSettings poolSettings = newSettings(); Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 ); - return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_LOGGING, clock ); + return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock ); } private static PoolSettings newSettings() @@ -231,10 +232,10 @@ private static class TestConnectionPool extends ConnectionPoolImpl { final Map channelPoolsByAddress = new HashMap<>(); - TestConnectionPool( ActiveChannelTracker activeChannelTracker ) + TestConnectionPool( NettyChannelTracker nettyChannelTracker ) { - super( mock( ChannelConnector.class ), mock( Bootstrap.class ), activeChannelTracker, newSettings(), - DEV_NULL_LOGGING, new FakeClock() ); + super( mock( ChannelConnector.class ), mock( Bootstrap.class ), nettyChannelTracker, newSettings(), + DEV_NULL_METRICS, DEV_NULL_LOGGING, new FakeClock() ); } ChannelPool getPool( BoltServerAddress address ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java index dd640e49d2..93ce7774c7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java @@ -21,7 +21,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.pool.ChannelHealthChecker; -import io.netty.channel.pool.ChannelPoolHandler; import io.netty.util.concurrent.Future; import org.junit.After; import org.junit.Before; @@ -58,6 +57,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.v1.Values.value; public class NettyChannelPoolTest @@ -66,14 +66,14 @@ public class NettyChannelPoolTest public final TestNeo4j neo4j = new TestNeo4j(); private Bootstrap bootstrap; - private ChannelPoolHandler poolHandler; + private NettyChannelTracker poolHandler; private NettyChannelPool pool; @Before public void setUp() { bootstrap = BootstrapFactory.newBootstrap( 1 ); - poolHandler = mock( ChannelPoolHandler.class ); + poolHandler = mock( NettyChannelTracker.class ); } @After @@ -183,24 +183,24 @@ public void shouldLimitNumberOfConcurrentConnections() throws Exception @Test public void shouldTrackActiveChannels() throws Exception { - ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( DEV_NULL_LOGGING ); + NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING ); - poolHandler = activeChannelTracker; + poolHandler = tracker; pool = newPool( neo4j.authToken() ); Channel channel1 = acquire( pool ); Channel channel2 = acquire( pool ); Channel channel3 = acquire( pool ); - assertEquals( 3, activeChannelTracker.activeChannelCount( neo4j.address() ) ); + assertEquals( 3, tracker.inUseChannelCount( neo4j.address() ) ); release( channel1 ); release( channel2 ); release( channel3 ); - assertEquals( 0, activeChannelTracker.activeChannelCount( neo4j.address() ) ); + assertEquals( 0, tracker.inUseChannelCount( neo4j.address() ) ); assertNotNull( acquire( pool ) ); assertNotNull( acquire( pool ) ); - assertEquals( 2, activeChannelTracker.activeChannelCount( neo4j.address() ) ); + assertEquals( 2, tracker.inUseChannelCount( neo4j.address() ) ); } private NettyChannelPool newPool( AuthToken authToken ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java new file mode 100644 index 0000000000..122478543e --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.pool; + +import io.netty.channel.Channel; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import org.neo4j.driver.internal.BoltServerAddress; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; + +public class NettyChannelTrackerTest +{ + private final BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT; + private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING ); + + @Test + public void shouldIncrementInUseCountWhenChannelCreated() + { + Channel channel = newChannel(); + assertEquals( 0, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + + tracker.channelCreated( channel ); + assertEquals( 1, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + } + + @Test + public void shouldIncrementInUseCountWhenChannelAcquired() + { + Channel channel = newChannel(); + assertEquals( 0, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + + tracker.channelCreated( channel ); + assertEquals( 1, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + + tracker.channelReleased( channel ); + assertEquals( 0, tracker.inUseChannelCount( address ) ); + assertEquals( 1, tracker.idleChannelCount( address ) ); + + tracker.channelAcquired( channel ); + assertEquals( 1, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + } + + @Test + public void shouldIncrementInuseCountForAddress() + { + Channel channel1 = newChannel(); + Channel channel2 = newChannel(); + Channel channel3 = newChannel(); + + assertEquals( 0, tracker.inUseChannelCount( address ) ); + tracker.channelCreated( channel1 ); + assertEquals( 1, tracker.inUseChannelCount( address ) ); + tracker.channelCreated( channel2 ); + assertEquals( 2, tracker.inUseChannelCount( address ) ); + tracker.channelCreated( channel3 ); + assertEquals( 3, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + } + + @Test + public void shouldDecrementCountForAddress() + { + Channel channel1 = newChannel(); + Channel channel2 = newChannel(); + Channel channel3 = newChannel(); + + tracker.channelCreated( channel1 ); + tracker.channelCreated( channel2 ); + tracker.channelCreated( channel3 ); + assertEquals( 3, tracker.inUseChannelCount( address ) ); + assertEquals( 0, tracker.idleChannelCount( address ) ); + + tracker.channelReleased( channel1 ); + assertEquals( 2, tracker.inUseChannelCount( address ) ); + assertEquals( 1, tracker.idleChannelCount( address ) ); + tracker.channelReleased( channel2 ); + assertEquals( 1, tracker.inUseChannelCount( address ) ); + assertEquals( 2, tracker.idleChannelCount( address ) ); + tracker.channelReleased( channel3 ); + assertEquals( 0, tracker.inUseChannelCount( address ) ); + assertEquals( 3, tracker.idleChannelCount( address ) ); + } + + @Test + public void shouldThrowWhenDecrementingForUnknownAddress() + { + Channel channel = newChannel(); + + try + { + tracker.channelReleased( channel ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalStateException.class ) ); + } + } + + @Test + public void shouldReturnZeroActiveCountForUnknownAddress() + { + assertEquals( 0, tracker.inUseChannelCount( address ) ); + } + + private Channel newChannel() + { + EmbeddedChannel channel = new EmbeddedChannel(); + setServerAddress( channel, address ); + return channel; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java index 6b1f39a8bb..aba9e7b478 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java @@ -85,7 +85,7 @@ public void shouldHandleSingleWriterWithoutActiveConnections() public void shouldHandleSingleReaderWithActiveConnections() { BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); - when( connectionPool.activeConnections( address ) ).thenReturn( 42 ); + when( connectionPool.inUseConnections( address ) ).thenReturn( 42 ); assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); } @@ -94,7 +94,7 @@ public void shouldHandleSingleReaderWithActiveConnections() public void shouldHandleSingleWriterWithActiveConnections() { BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); - when( connectionPool.activeConnections( address ) ).thenReturn( 24 ); + when( connectionPool.inUseConnections( address ) ).thenReturn( 24 ); assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); } @@ -106,9 +106,9 @@ public void shouldHandleMultipleReadersWithActiveConnections() BoltServerAddress address2 = new BoltServerAddress( "reader", 2 ); BoltServerAddress address3 = new BoltServerAddress( "reader", 3 ); - when( connectionPool.activeConnections( address1 ) ).thenReturn( 3 ); - when( connectionPool.activeConnections( address2 ) ).thenReturn( 4 ); - when( connectionPool.activeConnections( address3 ) ).thenReturn( 1 ); + when( connectionPool.inUseConnections( address1 ) ).thenReturn( 3 ); + when( connectionPool.inUseConnections( address2 ) ).thenReturn( 4 ); + when( connectionPool.inUseConnections( address3 ) ).thenReturn( 1 ); assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); } @@ -121,10 +121,10 @@ public void shouldHandleMultipleWritersWithActiveConnections() BoltServerAddress address3 = new BoltServerAddress( "writer", 3 ); BoltServerAddress address4 = new BoltServerAddress( "writer", 4 ); - when( connectionPool.activeConnections( address1 ) ).thenReturn( 5 ); - when( connectionPool.activeConnections( address2 ) ).thenReturn( 6 ); - when( connectionPool.activeConnections( address3 ) ).thenReturn( 0 ); - when( connectionPool.activeConnections( address4 ) ).thenReturn( 1 ); + when( connectionPool.inUseConnections( address1 ) ).thenReturn( 5 ); + when( connectionPool.inUseConnections( address2 ) ).thenReturn( 6 ); + when( connectionPool.inUseConnections( address3 ) ).thenReturn( 0 ); + when( connectionPool.inUseConnections( address4 ) ).thenReturn( 1 ); assertEquals( address3, strategy.selectWriter( new BoltServerAddress[]{address1, address2, address3, address4} ) ); @@ -182,7 +182,7 @@ public void shouldTraceLogSelectedAddress() Logger logger = mock( Logger.class ); when( logging.getLog( anyString() ) ).thenReturn( logger ); - when( connectionPool.activeConnections( any( BoltServerAddress.class ) ) ).thenReturn( 42 ); + when( connectionPool.inUseConnections( any( BoltServerAddress.class ) ) ).thenReturn( 42 ); LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index d1c85ff462..47fc02b1b6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -171,9 +171,9 @@ public void shouldSelectLeastConnectedAddress() { ConnectionPool connectionPool = newConnectionPoolMock(); - when( connectionPool.activeConnections( A ) ).thenReturn( 0 ); - when( connectionPool.activeConnections( B ) ).thenReturn( 20 ); - when( connectionPool.activeConnections( C ) ).thenReturn( 0 ); + when( connectionPool.inUseConnections( A ) ).thenReturn( 0 ); + when( connectionPool.inUseConnections( B ) ).thenReturn( 20 ); + when( connectionPool.inUseConnections( C ) ).thenReturn( 0 ); RoutingTable routingTable = mock( RoutingTable.class ); AddressSet readerAddresses = mock( AddressSet.class ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/metrics/InternalHistogramTest.java b/driver/src/test/java/org/neo4j/driver/internal/metrics/InternalHistogramTest.java new file mode 100644 index 0000000000..66e17ae2cb --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/metrics/InternalHistogramTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.metrics; + +import org.junit.Test; + +import org.neo4j.driver.internal.metrics.spi.Histogram; + +import static java.lang.Math.abs; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertThat; + +public class InternalHistogramTest +{ + @Test + public void shouldRecordSmallValuesPrecisely() throws Throwable + { + // Given + InternalHistogram histogram = new InternalHistogram(); + histogram.recordValue( 0 ); + histogram.recordValue( 1 ); + histogram.recordValue( 2 ); + histogram.recordValue( 3 ); + histogram.recordValue( 4 ); + + // When + assertThat( histogram.min(), equalTo( 0L ) ); + assertThat( histogram.max(), equalTo( 4L ) ); + assertThat( histogram.mean(), equalTo( 2.0 ) ); + assertThat( histogram.totalCount(), equalTo( 5L ) ); + } + + @Test + public void shouldRecordBigValuesPrecisely() throws Throwable + { + // Given + InternalHistogram histogram = new InternalHistogram(); + histogram.recordValue( 0 ); + histogram.recordValue( 100000 ); + histogram.recordValue( 200000 ); + histogram.recordValue( 300000 ); + histogram.recordValue( 400000 ); + + // When + assertThat( histogram.min(), equalTo( 0L ) ); + assertThat( abs( histogram.max() - 400000L ), lessThan( 500L ) ); + assertThat( abs( histogram.mean() - 200000.0 ), lessThan( 500.0 ) ); + assertThat( histogram.totalCount(), equalTo( 5L ) ); + } + + @Test + public void shouldResetOnOriginalHistogram() throws Throwable + { + // Given + InternalHistogram histogram = new InternalHistogram(); + histogram.recordValue( 0 ); + histogram.recordValue( 1 ); + histogram.recordValue( 2 ); + histogram.recordValue( 3 ); + histogram.recordValue( 4 ); + + // When + assertThat( histogram.totalCount(), equalTo( 5L ) ); + Histogram snapshot = histogram.snapshot(); + snapshot.reset(); + + // Then + assertThat( histogram.totalCount(), equalTo( 0L ) ); + } + +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java index cacd88711b..7c7980cc9d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java @@ -29,6 +29,7 @@ import org.neo4j.driver.internal.ConnectionSettings; import org.neo4j.driver.internal.async.BootstrapFactory; import org.neo4j.driver.internal.async.ChannelConnector; +import org.neo4j.driver.internal.metrics.MetricsListener; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.v1.AuthToken; @@ -70,10 +71,10 @@ protected final ChannelConnector createConnector( ConnectionSettings settings, S } @Override - protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Bootstrap bootstrap, Config config ) + protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, + Config config ) { - pool = super.createConnectionPool( authToken, securityPlan, bootstrap, config ); + pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config ); return pool; } @@ -102,6 +103,6 @@ public List pollChannels() public int activeChannels( BoltServerAddress address ) { - return pool == null ? 0 : pool.activeConnections( address ); + return pool == null ? 0 : pool.inUseConnections( address ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java index cb7ec6cba2..cdb4e27305 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java @@ -27,6 +27,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DriverFactory; +import org.neo4j.driver.internal.metrics.MetricsListener; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; @@ -40,10 +41,9 @@ public class FailingConnectionDriverFactory extends DriverFactory private final AtomicReference nextRunFailure = new AtomicReference<>(); @Override - protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, - Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config ) { - ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, config ); + ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config ); return new ConnectionPoolWithFailingConnections( pool, nextRunFailure ); } @@ -77,9 +77,15 @@ public void retainAll( Set addressesToRetain ) } @Override - public int activeConnections( BoltServerAddress address ) + public int inUseConnections( BoltServerAddress address ) { - return delegate.activeConnections( address ); + return delegate.inUseConnections( address ); + } + + @Override + public int idleConnections( BoltServerAddress address ) + { + return delegate.idleConnections( address ); } @Override @@ -87,6 +93,12 @@ public CompletionStage close() { return delegate.close(); } + + @Override + public boolean isOpen() + { + return delegate.isOpen(); + } } private static class FailingConnection implements Connection diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java index 93ff0cc96d..f2d61d885b 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; import org.neo4j.driver.internal.async.pool.PoolSettings; import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.metrics.MetricsListener; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; @@ -64,6 +65,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS; import static org.neo4j.driver.v1.Config.defaultConfig; import static org.neo4j.driver.v1.Values.parameters; import static org.neo4j.driver.v1.util.TestUtil.await; @@ -294,8 +296,7 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory MemorizingConnectionPool connectionPool; @Override - protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Bootstrap bootstrap, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config ) { ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 ); PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(), @@ -317,7 +318,7 @@ private static class MemorizingConnectionPool extends ConnectionPoolImpl MemorizingConnectionPool( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, Logging logging, Clock clock ) { - super( connector, bootstrap, settings, logging, clock ); + super( connector, bootstrap, settings, DEV_NULL_METRICS, logging, clock ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java b/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java index cc2377c3aa..af894c913b 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java @@ -44,6 +44,7 @@ import java.util.function.Function; import java.util.logging.Level; +import org.neo4j.driver.internal.InternalDriver; import org.neo4j.driver.internal.logging.ConsoleLogging; import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.util.Futures; @@ -73,6 +74,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.neo4j.driver.internal.metrics.spi.Metrics.DRIVER_METRICS_ENABLED_KEY; public abstract class AbstractStressTestBase { @@ -87,11 +89,12 @@ public abstract class AbstractStressTestBase private LoggerNameTrackingLogging logging; private ExecutorService executor; - Driver driver; + InternalDriver driver; @Before public void setUp() { + System.setProperty( DRIVER_METRICS_ENABLED_KEY, "true" ); logging = new LoggerNameTrackingLogging(); Config config = Config.build() @@ -100,7 +103,8 @@ public void setUp() .withConnectionAcquisitionTimeout( 1, MINUTES ) .toConfig(); - driver = GraphDatabase.driver( databaseUri(), authToken(), config ); + driver = (InternalDriver) GraphDatabase.driver( databaseUri(), authToken(), config ); + System.setProperty( DRIVER_METRICS_ENABLED_KEY, "false" ); ThreadFactory threadFactory = new DaemonThreadFactory( getClass().getSimpleName() + "-worker-" ); executor = Executors.newCachedThreadPool( threadFactory ); @@ -109,10 +113,12 @@ public void setUp() @After public void tearDown() { + System.out.println( driver.metrics() ); executor.shutdownNow(); if ( driver != null ) { driver.close(); + System.out.println( driver.metrics() ); } } diff --git a/pom.xml b/pom.xml index ad103e574f..ddcd87b37f 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,11 @@ netty-handler 4.1.16.Final + + org.hdrhistogram + HdrHistogram + 2.1.10 +