@@ -208,6 +212,7 @@
io.netty:*
+ org.hdrhistogram:*
@@ -215,6 +220,10 @@
io.netty
org.neo4j.driver.internal.shaded.io.netty
+
+ org.HdrHistogram
+ org.neo4j.driver.internal.shaded.org.HdrHistogram
+
true
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index 9de1eca0b1..7f0a2d533d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -38,6 +38,10 @@
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
import org.neo4j.driver.internal.logging.NettyLogging;
+import org.neo4j.driver.internal.metrics.MetricsListener;
+import org.neo4j.driver.internal.metrics.InternalAbstractMetrics;
+import org.neo4j.driver.internal.metrics.InternalMetrics;
+import org.neo4j.driver.internal.metrics.spi.Metrics;
import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.retry.RetrySettings;
@@ -56,6 +60,8 @@
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import static java.lang.String.format;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
+import static org.neo4j.driver.internal.metrics.spi.Metrics.isMetricsEnabled;
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
public class DriverFactory
@@ -77,18 +83,17 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
- ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config );
+ InternalAbstractMetrics metrics = createDriverMetrics( config );
+ ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
- InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
- eventExecutorGroup, securityPlan, retryLogic );
+ InternalDriver driver = createDriver( uri, securityPlan, address, connectionPool, eventExecutorGroup, newRoutingSettings, retryLogic, metrics, config );
verifyConnectivity( driver, connectionPool, config );
return driver;
}
- protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Bootstrap bootstrap, Config config )
+ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
{
Clock clock = createClock();
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
@@ -97,7 +102,19 @@ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
config.idleTimeBeforeConnectionTest()
);
- return new ConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
+ return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metrics, config.logging(), clock );
+ }
+
+ protected static InternalAbstractMetrics createDriverMetrics( Config config )
+ {
+ if( isMetricsEnabled() )
+ {
+ return new InternalMetrics( config );
+ }
+ else
+ {
+ return DEV_NULL_METRICS;
+ }
}
protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan,
@@ -106,9 +123,8 @@ protected ChannelConnector createConnector( ConnectionSettings settings, Securit
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock );
}
- private InternalDriver createDriver( URI uri, BoltServerAddress address,
- ConnectionPool connectionPool, Config config, RoutingSettings routingSettings,
- EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
+ private InternalDriver createDriver( URI uri, SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool,
+ EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, Metrics metrics, Config config )
{
try
{
@@ -117,10 +133,9 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
- return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
+ return createDirectDriver( securityPlan, address, connectionPool, retryLogic, metrics, config );
case BOLT_ROUTING_URI_SCHEME:
- return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
- eventExecutorGroup );
+ return createRoutingDriver( securityPlan, address, connectionPool, eventExecutorGroup, routingSettings, retryLogic, metrics, config );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
@@ -138,12 +153,12 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
*
* This method is protected only for testing
*/
- protected InternalDriver createDirectDriver( BoltServerAddress address, Config config,
- SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool )
+ protected InternalDriver createDirectDriver( SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool, RetryLogic retryLogic,
+ Metrics metrics, Config config )
{
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
- return createDriver( sessionFactory, securityPlan, config );
+ return createDriver( securityPlan, sessionFactory, metrics, config );
}
/**
@@ -151,9 +166,8 @@ protected InternalDriver createDirectDriver( BoltServerAddress address, Config c
*
* This method is protected only for testing
*/
- protected InternalDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
- Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup )
+ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool,
+ EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, Metrics metrics, Config config )
{
if ( !securityPlan.isRoutingCompatible() )
{
@@ -162,7 +176,7 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup,
config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
- return createDriver( sessionFactory, securityPlan, config );
+ return createDriver( securityPlan, sessionFactory, metrics, config );
}
/**
@@ -170,9 +184,9 @@ protected InternalDriver createRoutingDriver( BoltServerAddress address, Connect
*
* This method is protected only for testing
*/
- protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config )
+ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Config config )
{
- return new InternalDriver( securityPlan, sessionFactory, config.logging() );
+ return new InternalDriver( securityPlan, sessionFactory, metrics, config.logging() );
}
/**
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
index 2528d493fd..3378ab3531 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
@@ -21,6 +21,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.neo4j.driver.internal.metrics.spi.Metrics;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.v1.AccessMode;
@@ -38,11 +39,13 @@ public class InternalDriver implements Driver
private final Logger log;
private AtomicBoolean closed = new AtomicBoolean( false );
+ private final Metrics metrics;
- InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
+ InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Logging logging )
{
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
+ this.metrics = metrics;
this.log = logging.getLog( Driver.class.getSimpleName() );
log.info( "Driver instance %s created", this );
}
@@ -144,6 +147,11 @@ private void assertOpen()
}
}
+ public Metrics metrics()
+ {
+ return this.metrics;
+ }
+
private static RuntimeException driverCloseException()
{
return new IllegalStateException( "This driver instance has already been closed" );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java
index 3d3bb0c9e6..8035010730 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java
@@ -34,6 +34,8 @@
import org.neo4j.driver.internal.messaging.PullAllMessage;
import org.neo4j.driver.internal.messaging.ResetMessage;
import org.neo4j.driver.internal.messaging.RunMessage;
+import org.neo4j.driver.internal.metrics.ListenerEvent;
+import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.Clock;
@@ -54,8 +56,10 @@ public class NettyConnection implements Connection
private final Clock clock;
private final AtomicReference status = new AtomicReference<>( Status.OPEN );
+ private final MetricsListener metricsListener;
+ private final ListenerEvent inUseEvent;
- public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
+ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock, MetricsListener metricsListener )
{
this.channel = channel;
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
@@ -64,6 +68,9 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
this.channelPool = channelPool;
this.releaseFuture = new CompletableFuture<>();
this.clock = clock;
+ this.metricsListener = metricsListener;
+ this.inUseEvent = metricsListener.createListenerEvent();
+ metricsListener.afterAcquiredOrCreated( this.serverAddress, this.inUseEvent );
}
@Override
@@ -124,6 +131,7 @@ public CompletionStage release()
{
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
{
+ metricsListener.afterReleased( this.serverAddress, this.inUseEvent );
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
channelPool, messageDispatcher, clock, releaseFuture );
@@ -137,6 +145,7 @@ public void terminateAndRelease( String reason )
{
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
{
+ metricsListener.afterReleased( this.serverAddress, this.inUseEvent );
setTerminationReason( channel, reason );
channel.close();
channelPool.release( channel );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java
deleted file mode 100644
index 728c96da8a..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ActiveChannelTracker.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (c) 2002-2018 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.async.pool;
-
-import io.netty.channel.Channel;
-import io.netty.channel.pool.ChannelPoolHandler;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.neo4j.driver.internal.BoltServerAddress;
-import org.neo4j.driver.v1.Logger;
-import org.neo4j.driver.v1.Logging;
-
-import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
-
-public class ActiveChannelTracker implements ChannelPoolHandler
-{
- private final Map addressToActiveChannelCount = new ConcurrentHashMap<>();
- private final Logger log;
-
- public ActiveChannelTracker( Logging logging )
- {
- this.log = logging.getLog( getClass().getSimpleName() );
- }
-
- @Override
- public void channelReleased( Channel channel )
- {
- log.debug( "Channel %s released back to the pool", channel );
- channelInactive( channel );
- }
-
- @Override
- public void channelAcquired( Channel channel )
- {
- log.debug( "Channel %s acquired from the pool", channel );
- channelActive( channel );
- }
-
- @Override
- public void channelCreated( Channel channel )
- {
- log.debug( "Channel %s created", channel );
- channelActive( channel );
- }
-
- public int activeChannelCount( BoltServerAddress address )
- {
- AtomicInteger count = addressToActiveChannelCount.get( address );
- return count == null ? 0 : count.get();
- }
-
- private void channelActive( Channel channel )
- {
- BoltServerAddress address = serverAddress( channel );
- AtomicInteger count = addressToActiveChannelCount.computeIfAbsent( address, k -> new AtomicInteger() );
- count.incrementAndGet();
- }
-
- private void channelInactive( Channel channel )
- {
- BoltServerAddress address = serverAddress( channel );
- AtomicInteger count = addressToActiveChannelCount.get( address );
- if ( count == null )
- {
- throw new IllegalStateException( "No count exist for address '" + address + "'" );
- }
- count.decrementAndGet();
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
index 264e6286bd..474ec9e56e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java
@@ -36,6 +36,8 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.async.NettyConnection;
+import org.neo4j.driver.internal.metrics.ListenerEvent;
+import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
@@ -48,29 +50,31 @@ public class ConnectionPoolImpl implements ConnectionPool
{
private final ChannelConnector connector;
private final Bootstrap bootstrap;
- private final ActiveChannelTracker activeChannelTracker;
+ private final NettyChannelTracker nettyChannelTracker;
private final NettyChannelHealthChecker channelHealthChecker;
private final PoolSettings settings;
private final Clock clock;
private final Logger log;
+ private MetricsListener metricsListener;
private final ConcurrentMap pools = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
- Logging logging, Clock clock )
+ MetricsListener metricsListener, Logging logging, Clock clock )
{
- this( connector, bootstrap, new ActiveChannelTracker( logging ), settings, logging, clock );
+ this( connector, bootstrap, new NettyChannelTracker( metricsListener, logging ), settings, metricsListener, logging, clock );
}
- ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, ActiveChannelTracker activeChannelTracker,
- PoolSettings settings, Logging logging, Clock clock )
+ ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
+ PoolSettings settings, MetricsListener metricsListener, Logging logging, Clock clock )
{
this.connector = connector;
this.bootstrap = bootstrap;
- this.activeChannelTracker = activeChannelTracker;
+ this.nettyChannelTracker = nettyChannelTracker;
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
this.settings = settings;
+ this.metricsListener = metricsListener;
this.clock = clock;
this.log = logging.getLog( ConnectionPool.class.getSimpleName() );
}
@@ -82,13 +86,23 @@ public CompletionStage acquire( BoltServerAddress address )
assertNotClosed();
ChannelPool pool = getOrCreatePool( address );
+
+ ListenerEvent acquireEvent = metricsListener.createListenerEvent();
+ metricsListener.beforeAcquiringOrCreating( address, acquireEvent );
Future connectionFuture = pool.acquire();
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
{
- processAcquisitionError( error );
- assertNotClosed( address, channel, pool );
- return new NettyConnection( channel, pool, clock );
+ try
+ {
+ processAcquisitionError( error );
+ assertNotClosed( address, channel, pool );
+ return new NettyConnection( channel, pool, clock, metricsListener );
+ }
+ finally
+ {
+ metricsListener.afterAcquiringOrCreating( address, acquireEvent );
+ }
} );
}
@@ -99,7 +113,7 @@ public void retainAll( Set addressesToRetain )
{
if ( !addressesToRetain.contains( address ) )
{
- int activeChannels = activeChannelTracker.activeChannelCount( address );
+ int activeChannels = nettyChannelTracker.inUseChannelCount( address );
if ( activeChannels == 0 )
{
// address is not present in updated routing table and has no active connections
@@ -118,9 +132,15 @@ public void retainAll( Set addressesToRetain )
}
@Override
- public int activeConnections( BoltServerAddress address )
+ public int inUseConnections( BoltServerAddress address )
+ {
+ return nettyChannelTracker.inUseChannelCount( address );
+ }
+
+ @Override
+ public int idleConnections( BoltServerAddress address )
{
- return activeChannelTracker.activeChannelCount( address );
+ return nettyChannelTracker.idleChannelCount( address );
}
@Override
@@ -150,26 +170,38 @@ public CompletionStage close()
.thenApply( ignore -> null );
}
+ @Override
+ public boolean isOpen()
+ {
+ return !closed.get();
+ }
+
private ChannelPool getOrCreatePool( BoltServerAddress address )
{
ChannelPool pool = pools.get( address );
- if ( pool == null )
+ if ( pool != null )
{
- pool = newPool( address );
+ return pool;
+ }
- if ( pools.putIfAbsent( address, pool ) != null )
+ synchronized ( this )
+ {
+ pool = pools.get( address );
+ if ( pool != null )
{
- // We lost a race to create the pool, dispose of the one we created, and recurse
- pool.close();
- return getOrCreatePool( address );
+ return pool;
}
+
+ metricsListener.addMetrics( address, this );
+ pool = newPool( address );
+ pools.put( address, pool );
}
return pool;
}
ChannelPool newPool( BoltServerAddress address )
{
- return new NettyChannelPool( address, connector, bootstrap, activeChannelTracker, channelHealthChecker,
+ return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker,
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );
}
@@ -217,4 +249,10 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Channe
assertNotClosed();
}
}
+
+ @Override
+ public String toString()
+ {
+ return "ConnectionPoolImpl{" + "pools=" + pools + '}';
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java
index 893f189f7d..e350ae0388 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java
@@ -19,13 +19,14 @@
package org.neo4j.driver.internal.async.pool;
import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.pool.ChannelHealthChecker;
-import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.ChannelConnector;
+import org.neo4j.driver.internal.metrics.ListenerEvent;
import static java.util.Objects.requireNonNull;
@@ -42,9 +43,10 @@ public class NettyChannelPool extends FixedChannelPool
private final BoltServerAddress address;
private final ChannelConnector connector;
+ private final NettyChannelTracker handler;
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap,
- ChannelPoolHandler handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis,
+ NettyChannelTracker handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis,
int maxConnections )
{
super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
@@ -52,19 +54,28 @@ public NettyChannelPool( BoltServerAddress address, ChannelConnector connector,
this.address = requireNonNull( address );
this.connector = requireNonNull( connector );
+ this.handler = requireNonNull( handler );
}
@Override
protected ChannelFuture connectChannel( Bootstrap bootstrap )
{
+ ListenerEvent creatingEvent = handler.beforeChannelCreating( address );
ChannelFuture channelFuture = connector.connect( address, bootstrap );
channelFuture.addListener( future ->
{
if ( future.isSuccess() )
{
// notify pool handler about a successful connection
- handler().channelCreated( channelFuture.channel() );
+ Channel channel = channelFuture.channel();
+ handler.channelCreated( channel );
+ channel.closeFuture().addListener( closeFuture -> handler.channelClosed( channel ) );
}
+ else
+ {
+ handler.channelFailedToCreate( address );
+ }
+ handler.afterChannelCreating( address, creatingEvent );
} );
return channelFuture;
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java
new file mode 100644
index 0000000000..11e98cbad0
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async.pool;
+
+import io.netty.channel.Channel;
+import io.netty.channel.pool.ChannelPoolHandler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.metrics.ListenerEvent;
+import org.neo4j.driver.internal.metrics.MetricsListener;
+import org.neo4j.driver.v1.Logger;
+import org.neo4j.driver.v1.Logging;
+
+import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
+
+public class NettyChannelTracker implements ChannelPoolHandler
+{
+ private final Map addressToInUseChannelCount = new ConcurrentHashMap<>();
+ private final Map addressToIdleChannelCount = new ConcurrentHashMap<>();
+ private final Logger log;
+ private final MetricsListener metricsListener;
+
+ public NettyChannelTracker( MetricsListener metricsListener, Logging logging )
+ {
+ this.metricsListener = metricsListener;
+ this.log = logging.getLog( getClass().getSimpleName() );
+ }
+
+ @Override
+ public void channelReleased( Channel channel )
+ {
+ log.debug( "Channel %s released back to the pool", channel );
+ decrementInUse( channel );
+ incrementIdle( channel );
+ }
+
+ @Override
+ public void channelAcquired( Channel channel )
+ {
+ log.debug( "Channel %s acquired from the pool", channel );
+ incrementInUse( channel );
+ decrementIdle( channel );
+ }
+
+ @Override
+ public void channelCreated( Channel channel )
+ {
+ log.debug( "Channel %s created", channel );
+ incrementInUse( channel );
+ metricsListener.afterCreated( serverAddress( channel ) );
+ }
+
+ public void channelFailedToCreate( BoltServerAddress address )
+ {
+ metricsListener.afterFailedToCreate( address );
+ }
+
+ public ListenerEvent beforeChannelCreating( BoltServerAddress address )
+ {
+ ListenerEvent creatingEvent = metricsListener.createListenerEvent();
+ metricsListener.beforeCreating( address, creatingEvent );
+ return creatingEvent;
+ }
+
+ public void afterChannelCreating( BoltServerAddress address, ListenerEvent creatingEvent )
+ {
+ metricsListener.afterCreating( address, creatingEvent );
+ }
+
+ public void channelClosed( Channel channel )
+ {
+ decrementIdle( channel );
+ metricsListener.afterClosed( serverAddress( channel ) );
+ }
+
+ public int inUseChannelCount( BoltServerAddress address )
+ {
+ AtomicInteger count = addressToInUseChannelCount.get( address );
+ return count == null ? 0 : count.get();
+ }
+
+ public int idleChannelCount( BoltServerAddress address )
+ {
+ AtomicInteger count = addressToIdleChannelCount.get( address );
+ return count == null ? 0 : count.get();
+ }
+
+ private void incrementInUse( Channel channel )
+ {
+ increment( channel, addressToInUseChannelCount );
+ }
+
+ private void decrementInUse( Channel channel )
+ {
+ decrement( channel, addressToInUseChannelCount );
+ }
+
+ private void incrementIdle( Channel channel )
+ {
+ increment( channel, addressToIdleChannelCount );
+ }
+
+ private void decrementIdle( Channel channel )
+ {
+ decrement( channel, addressToIdleChannelCount );
+ }
+
+ private void increment( Channel channel, Map countMap )
+ {
+ BoltServerAddress address = serverAddress( channel );
+ AtomicInteger count = countMap.computeIfAbsent( address, k -> new AtomicInteger() );
+ count.incrementAndGet();
+ }
+
+ private void decrement( Channel channel, Map countMap )
+ {
+ BoltServerAddress address = serverAddress( channel );
+ AtomicInteger count = countMap.get( address );
+ if ( count == null )
+ {
+ throw new IllegalStateException( "No count exist for address '" + address + "'" );
+ }
+ count.decrementAndGet();
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java
index ff7df31fec..c85ff26254 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java
@@ -77,7 +77,7 @@ private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArray
do
{
BoltServerAddress address = addresses[index];
- int activeConnections = connectionPool.activeConnections( address );
+ int activeConnections = connectionPool.inUseConnections( address );
if ( activeConnections < leastActiveConnections )
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java
new file mode 100644
index 0000000000..a8b55a2787
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+
+public interface ConnectionMetricsListener
+{
+ void beforeCreating( ListenerEvent listenerEvent );
+
+ void afterCreating( ListenerEvent listenerEvent );
+
+ void acquiredOrCreated( ListenerEvent listenerEvent );
+
+ void released(ListenerEvent listenerEvent);
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java
new file mode 100644
index 0000000000..93f414c7f9
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+public interface ConnectionPoolMetricsListener
+{
+ void beforeCreating();
+
+ void afterCreated();
+
+ void afterFailedToCreate();
+
+ void afterClosed();
+
+ void beforeAcquiringOrCreating( ListenerEvent listenerEvent );
+
+ void afterAcquiringOrCreating( ListenerEvent listenerEvent );
+}
+
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/HistogramSnapshot.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/HistogramSnapshot.java
new file mode 100644
index 0000000000..8959153abe
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/HistogramSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import org.neo4j.driver.internal.metrics.spi.Histogram;
+
+public class HistogramSnapshot implements Histogram
+{
+ private final Histogram copy;
+ private final Histogram origin;
+
+ public HistogramSnapshot( Histogram copy, Histogram origin )
+ {
+ this.copy = copy;
+ this.origin = origin;
+ }
+
+ @Override
+ public long min()
+ {
+ return copy.min();
+ }
+
+ @Override
+ public long max()
+ {
+ return copy.max();
+ }
+
+ @Override
+ public double mean()
+ {
+ return copy.mean();
+ }
+
+ @Override
+ public double stdDeviation()
+ {
+ return copy.stdDeviation();
+ }
+
+ @Override
+ public long totalCount()
+ {
+ return copy.totalCount();
+ }
+
+ @Override
+ public long valueAtPercentile( double percentile )
+ {
+ return copy.valueAtPercentile( percentile );
+ }
+
+ @Override
+ public void reset()
+ {
+ origin.reset();
+ }
+
+ @Override
+ public String toString()
+ {
+ return copy.toString();
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java
new file mode 100644
index 0000000000..959bfe8361
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
+import org.neo4j.driver.internal.metrics.spi.ConnectionMetrics;
+import org.neo4j.driver.internal.metrics.spi.ConnectionPoolMetrics;
+import org.neo4j.driver.internal.metrics.spi.Metrics;
+
+public abstract class InternalAbstractMetrics implements Metrics, MetricsListener
+{
+ public static final InternalAbstractMetrics DEV_NULL_METRICS = new InternalAbstractMetrics()
+ {
+
+ @Override
+ public void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
+ {
+ }
+
+ @Override
+ public void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
+ {
+ }
+
+ @Override
+ public void afterCreated( BoltServerAddress serverAddress )
+ {
+ }
+
+ @Override
+ public void afterFailedToCreate( BoltServerAddress serverAddress )
+ {
+ }
+
+ @Override
+ public void afterClosed( BoltServerAddress serverAddress )
+ {
+ }
+
+ @Override
+ public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
+ {
+ }
+
+ @Override
+ public void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
+ {
+ }
+
+ @Override
+ public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
+ {
+ }
+
+ @Override
+ public void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
+ {
+ }
+
+ @Override
+ public ListenerEvent createListenerEvent()
+ {
+ return null;
+ }
+
+
+ @Override
+ public void addMetrics( BoltServerAddress address, ConnectionPoolImpl connectionPool )
+ {
+
+ }
+
+ @Override
+ public Map connectionPoolMetrics()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map connectionMetrics()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Driver metrics not available while driver metrics is not enabled.";
+ }
+ };
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java
new file mode 100644
index 0000000000..6db36f3b71
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.metrics.spi.ConnectionMetrics;
+import org.neo4j.driver.internal.metrics.spi.Histogram;
+
+
+public class InternalConnectionMetrics implements ConnectionMetrics, ConnectionMetricsListener
+{
+ private final InternalHistogram connHistogram;
+ private final InternalHistogram inUseHistogram;
+ private final BoltServerAddress serverAddress;
+
+ public InternalConnectionMetrics( BoltServerAddress serverAddress, int connectionTimeoutMillis )
+ {
+ Objects.requireNonNull( serverAddress );
+ this.serverAddress = serverAddress;
+ connHistogram = new InternalHistogram( Duration.ofMillis( connectionTimeoutMillis ).toNanos() );
+ inUseHistogram = new InternalHistogram();
+ }
+
+ @Override
+ public String uniqueName()
+ {
+ return serverAddress.toString();
+ }
+
+ @Override
+ public Histogram connectionTimeHistogram()
+ {
+ return connHistogram.snapshot();
+ }
+
+ @Override
+ public Histogram inUseTimeHistogram()
+ {
+ return inUseHistogram.snapshot();
+ }
+
+ @Override
+ public void beforeCreating( ListenerEvent connEvent )
+ {
+ // creating a conn
+ connEvent.start();
+ }
+
+ @Override
+ public void afterCreating( ListenerEvent connEvent )
+ {
+ // finished conn creation
+ long elapsed = connEvent.elapsed();
+ connHistogram.recordValue( elapsed );
+ }
+
+ @Override
+ public void acquiredOrCreated( ListenerEvent inUseEvent )
+ {
+ // created + acquired = inUse
+ inUseEvent.start();
+ }
+
+ @Override
+ public void released(ListenerEvent inUseEvent)
+ {
+ // idle
+ long elapsed = inUseEvent.elapsed();
+ inUseHistogram.recordValue( elapsed );
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format( "connectionTimeHistogram=%s, inUseTimeHistogram=%s", connectionTimeHistogram(), inUseTimeHistogram() );
+ }
+
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java
new file mode 100644
index 0000000000..47130fef65
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.neo4j.driver.internal.metrics;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.metrics.spi.ConnectionPoolMetrics;
+import org.neo4j.driver.internal.metrics.spi.Histogram;
+import org.neo4j.driver.internal.metrics.spi.PoolStatus;
+import org.neo4j.driver.internal.spi.ConnectionPool;
+
+import static java.lang.String.format;
+import static org.neo4j.driver.internal.metrics.InternalMetrics.serverAddressToUniqueName;
+
+public class InternalConnectionPoolMetrics implements ConnectionPoolMetrics, ConnectionPoolMetricsListener
+{
+ private final BoltServerAddress address;
+ private final ConnectionPool pool;
+
+ private AtomicLong created = new AtomicLong();
+ private AtomicLong closed = new AtomicLong();
+ private AtomicInteger creating = new AtomicInteger();
+ private AtomicLong failedToCreate = new AtomicLong();
+
+ private InternalHistogram acquisitionTimeHistogram;
+
+ public InternalConnectionPoolMetrics( BoltServerAddress address, ConnectionPool pool, long connAcquisitionTimeoutMs )
+ {
+ Objects.requireNonNull( address );
+ Objects.requireNonNull( pool );
+
+ this.address = address;
+ this.pool = pool;
+ this.acquisitionTimeHistogram = new InternalHistogram( Duration.ofMillis( connAcquisitionTimeoutMs ).toNanos() );
+ }
+
+ @Override
+ public void beforeCreating()
+ {
+ creating.incrementAndGet();
+ }
+
+ @Override
+ public void afterFailedToCreate()
+ {
+ failedToCreate.incrementAndGet();
+ creating.decrementAndGet();
+ }
+
+ @Override
+ public void afterCreated()
+ {
+ created.incrementAndGet();
+ creating.decrementAndGet();
+ }
+
+ @Override
+ public void afterClosed()
+ {
+ closed.incrementAndGet();
+ }
+
+ @Override
+ public void beforeAcquiringOrCreating( ListenerEvent listenerEvent )
+ {
+ listenerEvent.start();
+ }
+
+ @Override
+ public void afterAcquiringOrCreating( ListenerEvent listenerEvent )
+ {
+ long elapsed = listenerEvent.elapsed();
+ acquisitionTimeHistogram.recordValue( elapsed );
+ }
+
+ @Override
+ public String uniqueName()
+ {
+ return serverAddressToUniqueName( address );
+ }
+
+ @Override
+ public PoolStatus poolStatus()
+ {
+ if ( pool.isOpen() )
+ {
+ return PoolStatus.Open;
+ }
+ else
+ {
+ return PoolStatus.Closed;
+ }
+ }
+
+ @Override
+ public int inUse()
+ {
+ return pool.inUseConnections( address );
+ }
+
+ @Override
+ public int idle()
+ {
+ return pool.idleConnections( address );
+ }
+
+ @Override
+ public int creating()
+ {
+ return creating.get();
+ }
+
+ @Override
+ public long created()
+ {
+ return created.get();
+ }
+
+ @Override
+ public long failedToCreate()
+ {
+ return failedToCreate.get();
+ }
+
+ @Override
+ public long closed()
+ {
+ return closed.get();
+ }
+
+ @Override
+ public Histogram acquisitionTimeHistogram()
+ {
+ return this.acquisitionTimeHistogram.snapshot();
+ }
+
+ @Override
+ public String toString()
+ {
+ return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]", created(),
+ closed(), creating(), failedToCreate(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalHistogram.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalHistogram.java
new file mode 100644
index 0000000000..d0a89e995b
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalHistogram.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import org.HdrHistogram.AbstractHistogram;
+import org.HdrHistogram.ConcurrentHistogram;
+
+import java.time.Duration;
+
+import org.neo4j.driver.internal.metrics.spi.Histogram;
+
+import static java.lang.String.format;
+
+public class InternalHistogram implements Histogram
+{
+ private static final long DEFAULT_HIGHEST_TRACKABLE_NS = Duration.ofMinutes( 10 ).toNanos();
+ private static final int DEFAULT_NUMBER_OF_SIGNIFICANT_VALUE_DIGITS = 3;
+
+ private final AbstractHistogram delegate;
+
+ public InternalHistogram()
+ {
+ this( DEFAULT_HIGHEST_TRACKABLE_NS );
+ }
+
+ public InternalHistogram( long highestTrackableValueNS )
+ {
+ this.delegate = createHdrHistogram( highestTrackableValueNS );
+ }
+
+ public InternalHistogram( AbstractHistogram histogram )
+ {
+ this.delegate = histogram;
+ }
+
+ public void recordValue( long value )
+ {
+ long newValue = truncateValue( value, delegate );
+ this.delegate.recordValue( newValue );
+ }
+
+ @Override
+ public long min()
+ {
+ return delegate.getMinValue();
+ }
+
+ @Override
+ public long max()
+ {
+ return delegate.getMaxValue();
+ }
+
+ @Override
+ public double mean()
+ {
+ return delegate.getMean();
+ }
+
+ @Override
+ public double stdDeviation()
+ {
+ return delegate.getStdDeviation();
+ }
+
+ @Override
+ public long totalCount()
+ {
+ return delegate.getTotalCount();
+ }
+
+ @Override
+ public long valueAtPercentile( double percentile )
+ {
+ return delegate.getValueAtPercentile( percentile );
+ }
+
+ @Override
+ public void reset()
+ {
+ delegate.reset();
+ }
+
+ public static ConcurrentHistogram createHdrHistogram( long highestTrackableValue )
+ {
+ return new ConcurrentHistogram( highestTrackableValue, DEFAULT_NUMBER_OF_SIGNIFICANT_VALUE_DIGITS );
+ }
+
+ private static long truncateValue( long value, AbstractHistogram histogram )
+ {
+ if ( value > histogram.getHighestTrackableValue() )
+ {
+ return histogram.getHighestTrackableValue();
+ }
+ else
+ {
+ return value;
+ }
+ }
+
+ public Histogram snapshot()
+ {
+ return new HistogramSnapshot( new InternalHistogram( this.delegate.copy() ), this );
+ }
+
+ @Override
+ public String toString()
+ {
+ return format("[min=%sns, max=%sns, mean=%sns, stdDeviation=%s, totalCount=%s]",
+ min(), max(), mean(), stdDeviation(), totalCount());
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java
new file mode 100644
index 0000000000..23b83587b7
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
+import org.neo4j.driver.internal.metrics.spi.ConnectionMetrics;
+import org.neo4j.driver.internal.metrics.spi.ConnectionPoolMetrics;
+import org.neo4j.driver.internal.spi.ConnectionPool;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.exceptions.ClientException;
+
+import static java.lang.String.format;
+
+public class InternalMetrics extends InternalAbstractMetrics
+{
+ private final Map connectionPoolMetrics;
+ private final Map connectionMetrics;
+ private final Config config;
+
+ public InternalMetrics( Config config )
+ {
+ Objects.requireNonNull( config );
+ this.config = config;
+ this.connectionPoolMetrics = new ConcurrentHashMap<>();
+ this.connectionMetrics = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void addMetrics( BoltServerAddress serverAddress, ConnectionPoolImpl pool )
+ {
+ addPoolMetrics( serverAddress, pool );
+ addConnectionMetrics( serverAddress );
+ }
+
+ @Override
+ public void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
+ {
+ poolMetrics( serverAddress ).beforeCreating();
+ connectionMetrics( serverAddress ).beforeCreating( creatingEvent );
+ }
+
+ @Override
+ public void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
+ {
+ connectionMetrics( serverAddress ).afterCreating( creatingEvent );
+ }
+
+ @Override
+ public void afterCreated( BoltServerAddress serverAddress )
+ {
+ poolMetrics( serverAddress ).afterCreated();
+ }
+
+ @Override
+ public void afterFailedToCreate( BoltServerAddress serverAddress )
+ {
+ poolMetrics( serverAddress ).afterFailedToCreate();
+ }
+
+ @Override
+ public void afterClosed( BoltServerAddress serverAddress )
+ {
+ poolMetrics( serverAddress ).afterClosed();
+ }
+
+ @Override
+ public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent listenerEvent )
+ {
+ poolMetrics( serverAddress ).beforeAcquiringOrCreating( listenerEvent );
+ }
+
+ @Override
+ public void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent listenerEvent )
+ {
+ poolMetrics( serverAddress ).afterAcquiringOrCreating( listenerEvent );
+ }
+
+ @Override
+ public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
+ {
+ connectionMetrics( serverAddress ).acquiredOrCreated( inUseEvent );
+ }
+
+ @Override
+ public void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
+ {
+ connectionMetrics( serverAddress ).released( inUseEvent );
+ }
+
+ @Override
+ public ListenerEvent createListenerEvent()
+ {
+ return new NanoTimeBasedListenerEvent();
+ }
+
+ @Override
+ public Map connectionPoolMetrics()
+ {
+ return this.connectionPoolMetrics;
+ }
+
+ @Override
+ public Map connectionMetrics()
+ {
+ return this.connectionMetrics;
+ }
+
+ @Override
+ public String toString()
+ {
+ return format( "PoolMetrics=%s, ConnMetrics=%s", connectionPoolMetrics, connectionMetrics );
+ }
+
+ static String serverAddressToUniqueName( BoltServerAddress serverAddress )
+ {
+ return serverAddress.toString();
+ }
+
+ private ConnectionPoolMetricsListener poolMetrics( BoltServerAddress serverAddress )
+ {
+ InternalConnectionPoolMetrics poolMetrics =
+ (InternalConnectionPoolMetrics) this.connectionPoolMetrics.get( serverAddressToUniqueName( serverAddress ) );
+ if ( poolMetrics == null )
+ {
+ throw new ClientException( format( "Failed to find pool metrics for server `%s` in %s", serverAddress, this.connectionPoolMetrics ) );
+ }
+ return poolMetrics;
+ }
+
+ private ConnectionMetricsListener connectionMetrics( BoltServerAddress serverAddress )
+ {
+ InternalConnectionMetrics connMetrics = (InternalConnectionMetrics) this.connectionMetrics.get( serverAddressToUniqueName( serverAddress ) );
+ if ( connMetrics == null )
+ {
+ throw new ClientException( format( "Failed to find connection metrics for server `%s` in %s", serverAddress, this.connectionMetrics ) );
+ }
+ return connMetrics;
+ }
+
+ private void addPoolMetrics( BoltServerAddress serverAddress, ConnectionPool pool )
+ {
+ this.connectionPoolMetrics.put( serverAddressToUniqueName( serverAddress ),
+ new InternalConnectionPoolMetrics( serverAddress, pool, config.connectionAcquisitionTimeoutMillis() ) );
+ }
+
+ private void addConnectionMetrics( BoltServerAddress serverAddress )
+ {
+ this.connectionMetrics.put( serverAddressToUniqueName( serverAddress ),
+ new InternalConnectionMetrics( serverAddress, config.connectionTimeoutMillis() ) );
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ListenerEvent.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ListenerEvent.java
new file mode 100644
index 0000000000..6eece7d235
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/ListenerEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+
+public interface ListenerEvent
+{
+ void start();
+ long elapsed();
+}
+
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java
new file mode 100644
index 0000000000..41d337b1ac
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+import org.neo4j.driver.internal.async.NettyConnection;
+import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
+
+public interface MetricsListener
+{
+ /**
+ * Before creating a netty channel
+ * @param serverAddress the server the netty channel binds to.
+ * @param creatingEvent a connection listener event registered when a connection is creating.
+ */
+ void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent );
+
+ /**
+ * After creating a netty channel regardless succeeded or failed.
+ * @param serverAddress the server the netty channel binds to.
+ * @param creatingEvent a connection listener event registered when a connection is creating.
+ */
+ void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent );
+
+ /**
+ * After a netty channel is created successfully
+ * This method will not invoke {@link this#afterCreating(BoltServerAddress, ListenerEvent)}
+ * @param serverAddress the server the netty channel binds to
+ */
+ void afterCreated( BoltServerAddress serverAddress );
+
+ /**
+ * After a netty channel is created with failure
+ * This method will not invoke {@link this#afterCreating(BoltServerAddress, ListenerEvent)}
+ * @param serverAddress the server the netty channel binds to
+ */
+ void afterFailedToCreate( BoltServerAddress serverAddress );
+
+ /**
+ * After a netty channel is closed successfully
+ * @param serverAddress the server the netty channel binds to
+ */
+ void afterClosed( BoltServerAddress serverAddress );
+
+ /**
+ * Before acquiring or creating a new netty channel from pool
+ * @param serverAddress the server the netty channel binds to
+ * @param acquireEvent a pool listener event registered in pool for this acquire event
+ */
+ void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent );
+
+ /**
+ * After acquiring or creating a new netty channel from pool regardless succeeded or failed
+ * @param serverAddress the server the netty channel binds to
+ * @param acquireEvent a pool listener event registered in pool for this acquire event
+ */
+ void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent );
+
+ /**
+ * After acquiring or creating a new netty channel from pool successfully.
+ * This method will not invoke {@link this#afterAcquiringOrCreating(BoltServerAddress, ListenerEvent)}
+ * @param serverAddress the server the netty channel binds to
+ * @param inUseEvent a connection listener registered with a {@link NettyConnection} when created
+ */
+ void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent );
+
+ /**
+ * After releasing a netty channel back to pool successfully
+ * @param serverAddress the server the netty channel binds to
+ * @param inUseEvent a connection listener registered with a {@link NettyConnection} when destroyed
+ */
+ void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent );
+
+ ListenerEvent createListenerEvent();
+
+ void addMetrics( BoltServerAddress address, ConnectionPoolImpl connectionPool );
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/NanoTimeBasedListenerEvent.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/NanoTimeBasedListenerEvent.java
new file mode 100644
index 0000000000..a2c315c4fd
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/NanoTimeBasedListenerEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.neo4j.driver.internal.metrics;
+
+public class NanoTimeBasedListenerEvent implements ListenerEvent
+{
+ private long startNanoTime;
+
+ @Override
+ public void start()
+ {
+ startNanoTime = System.nanoTime();
+ }
+
+ @Override
+ public long elapsed()
+ {
+ return System.nanoTime() - startNanoTime;
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionMetrics.java
new file mode 100644
index 0000000000..a0f5f773bb
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionMetrics.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics.spi;
+
+public interface ConnectionMetrics
+{
+ /**
+ * The unique name of this connection metrics among all connection metrics
+ * @return An unique name of this connection metrics among all connection metrics
+ */
+ String uniqueName();
+
+ /**
+ * The connection time histogram describes how long it takes to establish a connection
+ * @return The connection time histogram
+ */
+ Histogram connectionTimeHistogram ();
+
+ /**
+ * The in-use time histogram records how long each connection is borrowed out of the pool
+ * @return The in-use time histogram
+ */
+ Histogram inUseTimeHistogram();
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java
new file mode 100644
index 0000000000..40a8afddab
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics.spi;
+
+public interface ConnectionPoolMetrics
+{
+ /**
+ * An unique name that identifies this connection pool metrics among all others
+ * @return An unique name
+ */
+ String uniqueName();
+
+ /**
+ * The status of the pool
+ * @return The status of the pool in a string
+ */
+ PoolStatus poolStatus();
+
+ /**
+ * The amount of connections that is in-use (borrowed out of the pool).
+ * The number is changing up and down from time to time.
+ * @return The amount of connections that is in-use
+ */
+ int inUse();
+
+ /**
+ * The amount of connections that is idle (buffered inside the pool).
+ * The number is changing up and down from time to time.
+ * @return The amount of connections that is idle.
+ */
+ int idle();
+
+ /**
+ * The amount of connections that is going to be created.
+ * The amount is increased by one when the pool noticed a request to create a new connection.
+ * The amount is decreased by one when the pool noticed a new connection is created regardless successfully or not.
+ * The number is changing up and down from time to time.
+ * @return The amount of connection that is creating.
+ */
+ int creating();
+
+ /**
+ * An increasing-only number to record how many connections have been created by this pool successfully.
+ * @return The amount of connections have ever been created by this pool.
+ */
+ long created();
+
+ /**
+ * An increasing-only number to record how many connections have been failed to create.
+ * @return The amount of connections have been failed to create by this pool.
+ */
+ long failedToCreate();
+
+ /**
+ * An increasing-only number to record how many connections have been closed by this pool.
+ * @return The amount of connections have been closed by this pool.
+ */
+ long closed();
+
+ /**
+ * An acquisition time histogram records how long it takes to acquire an connection from this pool.
+ * The connection acquired from the pool could either be a connection idling inside the pool or a connection created by the pool.
+ * @return The acquisition time histogram.
+ */
+ Histogram acquisitionTimeHistogram();
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Histogram.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Histogram.java
new file mode 100644
index 0000000000..a05948d48c
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Histogram.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics.spi;
+
+public interface Histogram
+{
+ /**
+ * The minimum value recorded in this histogram.
+ * @return The minimum value recorded in this histogram.
+ */
+ long min();
+
+ /**
+ * The maximum value recorded in this histogram.
+ * @return The maximum value recorded in this histogram.
+ */
+ long max();
+
+ /**
+ * The mean value of this histogram.
+ * @return The mean value.
+ */
+ double mean();
+
+ /**
+ * The standard deviation of this histogram.
+ * @return The standard deviation.
+ */
+ double stdDeviation();
+
+ /**
+ * The total count of the values recorded in this histogram.
+ * @return The total number of values.
+ */
+ long totalCount();
+
+ /**
+ * Returns the value at the given percentile.
+ * @param percentile The interested percentile such as 50 (mean value), 80 etc.
+ * @return The value at the given percentile.
+ */
+ long valueAtPercentile(double percentile);
+
+ /**
+ * Reset cleans all the values recorded in this histogram.
+ */
+ void reset();
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Metrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Metrics.java
new file mode 100644
index 0000000000..ca04a88d62
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/Metrics.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics.spi;
+
+import java.util.Map;
+
+public interface Metrics
+{
+ // TODO Once this interface become public, find a better way to enable metrics and detect metrics availability.
+ String DRIVER_METRICS_ENABLED_KEY = "driver.metrics.enabled";
+ static boolean isMetricsEnabled()
+ {
+ return Boolean.getBoolean( DRIVER_METRICS_ENABLED_KEY );
+ }
+
+ /**
+ * A map of connection pool metrics.
+ * The {@link ConnectionPoolMetrics#uniqueName()} are used as the keys of the map.
+ * @return The connection pool metrics.
+ */
+ Map connectionPoolMetrics();
+
+ /***
+ * A map of connection metrics.
+ * The {@link ConnectionMetrics#uniqueName()} are used as the keys of the map.
+ * @return The connection metrics.
+ */
+ Map connectionMetrics();
+
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/PoolStatus.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/PoolStatus.java
new file mode 100644
index 0000000000..6fe7099805
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/spi/PoolStatus.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics.spi;
+
+public enum PoolStatus
+{
+ Open, Closed
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java
index ca3de53f41..73c7f6d86b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java
@@ -29,7 +29,11 @@ public interface ConnectionPool
void retainAll( Set addressesToRetain );
- int activeConnections( BoltServerAddress address );
+ int inUseConnections( BoltServerAddress address );
+
+ int idleConnections( BoltServerAddress address );
CompletionStage close();
+
+ boolean isOpen();
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java
index 5235dc0618..0257ee2cce 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java
@@ -33,6 +33,9 @@
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
+import org.neo4j.driver.internal.metrics.InternalMetrics;
+import org.neo4j.driver.internal.metrics.MetricsListener;
+import org.neo4j.driver.internal.metrics.spi.Metrics;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.security.SecurityPlan;
@@ -47,6 +50,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -56,6 +60,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.v1.AccessMode.READ;
@@ -175,6 +180,30 @@ public void shouldThrowWhenUnableToVerifyConnectivity()
}
}
+ @Test
+ public void shouldNotCreateDriverMetrics() throws Throwable
+ {
+ // Given
+ Config config = mock( Config.class );
+ // When
+ MetricsListener handler = DriverFactory.createDriverMetrics( config );
+ // Then
+ assertThat( handler, is( DEV_NULL_METRICS ) );
+ }
+
+ @Test
+ public void shouldCreateDriverMetricsIfMonitoringEnabled() throws Throwable
+ {
+ // Given
+ Config config = mock( Config.class );
+ System.setProperty( "driver.metrics.enabled", "True" );
+ // When
+ MetricsListener handler = DriverFactory.createDriverMetrics( config );
+ System.setProperty( "driver.metrics.enabled", "False" );
+ // Then
+ assertThat( handler instanceof InternalMetrics, is( true ) );
+ }
+
private Driver createDriver( DriverFactory driverFactory )
{
return createDriver( driverFactory, defaultConfig() );
@@ -206,22 +235,20 @@ private static class ThrowingDriverFactory extends DriverFactory
}
@Override
- protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config )
+ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Config config )
{
throw new UnsupportedOperationException( "Can't create direct driver" );
}
@Override
- protected InternalDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
- Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup )
+ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool,
+ EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, Metrics metrics, Config config )
{
throw new UnsupportedOperationException( "Can't create routing driver" );
}
@Override
- protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Bootstrap bootstrap, Config config )
+ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
{
return connectionPool;
}
@@ -232,7 +259,7 @@ private static class SessionFactoryCapturingDriverFactory extends DriverFactory
SessionFactory capturedSessionFactory;
@Override
- protected InternalDriver createDriver( SessionFactory sessionFactory, SecurityPlan securityPlan, Config config )
+ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Metrics metrics, Config config )
{
InternalDriver driver = mock( InternalDriver.class );
when( driver.verifyConnectivity() ).thenReturn( completedWithNull() );
@@ -256,8 +283,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
}
@Override
- protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Bootstrap bootstrap, Config config )
+ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
{
return connectionPoolMock();
}
@@ -279,8 +305,7 @@ protected Bootstrap createBootstrap()
}
@Override
- protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Bootstrap bootstrap, Config config )
+ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
{
return connectionPoolMock();
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java
index de6ce07178..9f39189cb7 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java
@@ -30,6 +30,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.v1.util.TestUtil.await;
@@ -72,7 +73,7 @@ public void shouldVerifyConnectivity()
private static InternalDriver newDriver( SessionFactory sessionFactory )
{
- return new InternalDriver( SecurityPlan.insecure(), sessionFactory, DEV_NULL_LOGGING );
+ return new InternalDriver( SecurityPlan.insecure(), sessionFactory, DEV_NULL_METRICS, DEV_NULL_LOGGING );
}
private static SessionFactory sessionFactoryMock()
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java
index b96f39b6e8..88955e8cd0 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java
@@ -60,6 +60,7 @@
import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.util.Iterables.single;
import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon;
@@ -497,7 +498,7 @@ private static NettyConnection newConnection( Channel channel )
private static NettyConnection newConnection( Channel channel, ChannelPool pool )
{
- return new NettyConnection( channel, pool, new FakeClock() );
+ return new NettyConnection( channel, pool, new FakeClock(), DEV_NULL_METRICS );
}
private static void assertConnectionReleasedError( IllegalStateException e )
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java
deleted file mode 100644
index 9c2c747c95..0000000000
--- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ActiveChannelTrackerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright (c) 2002-2018 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.async.pool;
-
-import io.netty.channel.Channel;
-import io.netty.channel.embedded.EmbeddedChannel;
-import org.junit.Test;
-
-import org.neo4j.driver.internal.BoltServerAddress;
-
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress;
-import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
-
-public class ActiveChannelTrackerTest
-{
- private final BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT;
- private final ActiveChannelTracker tracker = new ActiveChannelTracker( DEV_NULL_LOGGING );
-
- @Test
- public void shouldIncrementCountWhenChannelCreated()
- {
- Channel channel = newChannel();
- assertEquals( 0, tracker.activeChannelCount( address ) );
-
- tracker.channelCreated( channel );
- assertEquals( 1, tracker.activeChannelCount( address ) );
- }
-
- @Test
- public void shouldIncrementCountForAddress()
- {
- Channel channel1 = newChannel();
- Channel channel2 = newChannel();
- Channel channel3 = newChannel();
-
- assertEquals( 0, tracker.activeChannelCount( address ) );
- tracker.channelAcquired( channel1 );
- assertEquals( 1, tracker.activeChannelCount( address ) );
- tracker.channelAcquired( channel2 );
- assertEquals( 2, tracker.activeChannelCount( address ) );
- tracker.channelAcquired( channel3 );
- assertEquals( 3, tracker.activeChannelCount( address ) );
- }
-
- @Test
- public void shouldDecrementCountForAddress()
- {
- Channel channel1 = newChannel();
- Channel channel2 = newChannel();
- Channel channel3 = newChannel();
-
- tracker.channelAcquired( channel1 );
- tracker.channelAcquired( channel2 );
- tracker.channelAcquired( channel3 );
- assertEquals( 3, tracker.activeChannelCount( address ) );
-
- tracker.channelReleased( channel1 );
- assertEquals( 2, tracker.activeChannelCount( address ) );
- tracker.channelReleased( channel2 );
- assertEquals( 1, tracker.activeChannelCount( address ) );
- tracker.channelReleased( channel3 );
- assertEquals( 0, tracker.activeChannelCount( address ) );
- }
-
- @Test
- public void shouldThrowWhenDecrementingForUnknownAddress()
- {
- Channel channel = newChannel();
-
- try
- {
- tracker.channelReleased( channel );
- fail( "Exception expected" );
- }
- catch ( Exception e )
- {
- assertThat( e, instanceOf( IllegalStateException.class ) );
- }
- }
-
- @Test
- public void shouldReturnZeroActiveCountForUnknownAddress()
- {
- assertEquals( 0, tracker.activeChannelCount( address ) );
- }
-
- private Channel newChannel()
- {
- EmbeddedChannel channel = new EmbeddedChannel();
- setServerAddress( channel, address );
- return channel;
- }
-}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java
index f6a4c496e5..3f4f9dde66 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java
@@ -59,6 +59,7 @@
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.v1.util.TestUtil.await;
public class ConnectionPoolImplTest
@@ -146,19 +147,19 @@ public void shouldNotCloseWhenClosed()
@Test
public void shouldDoNothingWhenRetainOnEmptyPool()
{
- ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class );
- TestConnectionPool pool = new TestConnectionPool( activeChannelTracker );
+ NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
+ TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
pool.retainAll( singleton( LOCAL_DEFAULT ) );
- verifyZeroInteractions( activeChannelTracker );
+ verifyZeroInteractions( nettyChannelTracker );
}
@Test
public void shouldRetainSpecifiedAddresses()
{
- ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class );
- TestConnectionPool pool = new TestConnectionPool( activeChannelTracker );
+ NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
+ TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
pool.acquire( ADDRESS_1 );
pool.acquire( ADDRESS_2 );
@@ -174,16 +175,16 @@ public void shouldRetainSpecifiedAddresses()
@Test
public void shouldClosePoolsWhenRetaining()
{
- ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class );
- TestConnectionPool pool = new TestConnectionPool( activeChannelTracker );
+ NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
+ TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
pool.acquire( ADDRESS_1 );
pool.acquire( ADDRESS_2 );
pool.acquire( ADDRESS_3 );
- when( activeChannelTracker.activeChannelCount( ADDRESS_1 ) ).thenReturn( 2 );
- when( activeChannelTracker.activeChannelCount( ADDRESS_2 ) ).thenReturn( 0 );
- when( activeChannelTracker.activeChannelCount( ADDRESS_3 ) ).thenReturn( 3 );
+ when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 2 );
+ when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 0 );
+ when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 );
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) );
verify( pool.getPool( ADDRESS_1 ), never() ).close();
@@ -194,16 +195,16 @@ public void shouldClosePoolsWhenRetaining()
@Test
public void shouldNotClosePoolsWithActiveConnectionsWhenRetaining()
{
- ActiveChannelTracker activeChannelTracker = mock( ActiveChannelTracker.class );
- TestConnectionPool pool = new TestConnectionPool( activeChannelTracker );
+ NettyChannelTracker nettyChannelTracker = mock( NettyChannelTracker.class );
+ TestConnectionPool pool = new TestConnectionPool( nettyChannelTracker );
pool.acquire( ADDRESS_1 );
pool.acquire( ADDRESS_2 );
pool.acquire( ADDRESS_3 );
- when( activeChannelTracker.activeChannelCount( ADDRESS_1 ) ).thenReturn( 1 );
- when( activeChannelTracker.activeChannelCount( ADDRESS_2 ) ).thenReturn( 42 );
- when( activeChannelTracker.activeChannelCount( ADDRESS_3 ) ).thenReturn( 0 );
+ when( nettyChannelTracker.inUseChannelCount( ADDRESS_1 ) ).thenReturn( 1 );
+ when( nettyChannelTracker.inUseChannelCount( ADDRESS_2 ) ).thenReturn( 42 );
+ when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 );
pool.retainAll( singleton( ADDRESS_2 ) );
verify( pool.getPool( ADDRESS_1 ), never() ).close();
@@ -219,7 +220,7 @@ private ConnectionPoolImpl newPool() throws Exception
DEV_NULL_LOGGING, clock );
PoolSettings poolSettings = newSettings();
Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 );
- return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_LOGGING, clock );
+ return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock );
}
private static PoolSettings newSettings()
@@ -231,10 +232,10 @@ private static class TestConnectionPool extends ConnectionPoolImpl
{
final Map channelPoolsByAddress = new HashMap<>();
- TestConnectionPool( ActiveChannelTracker activeChannelTracker )
+ TestConnectionPool( NettyChannelTracker nettyChannelTracker )
{
- super( mock( ChannelConnector.class ), mock( Bootstrap.class ), activeChannelTracker, newSettings(),
- DEV_NULL_LOGGING, new FakeClock() );
+ super( mock( ChannelConnector.class ), mock( Bootstrap.class ), nettyChannelTracker, newSettings(),
+ DEV_NULL_METRICS, DEV_NULL_LOGGING, new FakeClock() );
}
ChannelPool getPool( BoltServerAddress address )
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java
index dd640e49d2..93ce7774c7 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolTest.java
@@ -21,7 +21,6 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelHealthChecker;
-import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
@@ -58,6 +57,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.v1.Values.value;
public class NettyChannelPoolTest
@@ -66,14 +66,14 @@ public class NettyChannelPoolTest
public final TestNeo4j neo4j = new TestNeo4j();
private Bootstrap bootstrap;
- private ChannelPoolHandler poolHandler;
+ private NettyChannelTracker poolHandler;
private NettyChannelPool pool;
@Before
public void setUp()
{
bootstrap = BootstrapFactory.newBootstrap( 1 );
- poolHandler = mock( ChannelPoolHandler.class );
+ poolHandler = mock( NettyChannelTracker.class );
}
@After
@@ -183,24 +183,24 @@ public void shouldLimitNumberOfConcurrentConnections() throws Exception
@Test
public void shouldTrackActiveChannels() throws Exception
{
- ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( DEV_NULL_LOGGING );
+ NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING );
- poolHandler = activeChannelTracker;
+ poolHandler = tracker;
pool = newPool( neo4j.authToken() );
Channel channel1 = acquire( pool );
Channel channel2 = acquire( pool );
Channel channel3 = acquire( pool );
- assertEquals( 3, activeChannelTracker.activeChannelCount( neo4j.address() ) );
+ assertEquals( 3, tracker.inUseChannelCount( neo4j.address() ) );
release( channel1 );
release( channel2 );
release( channel3 );
- assertEquals( 0, activeChannelTracker.activeChannelCount( neo4j.address() ) );
+ assertEquals( 0, tracker.inUseChannelCount( neo4j.address() ) );
assertNotNull( acquire( pool ) );
assertNotNull( acquire( pool ) );
- assertEquals( 2, activeChannelTracker.activeChannelCount( neo4j.address() ) );
+ assertEquals( 2, tracker.inUseChannelCount( neo4j.address() ) );
}
private NettyChannelPool newPool( AuthToken authToken )
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java
new file mode 100644
index 0000000000..122478543e
--- /dev/null
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async.pool;
+
+import io.netty.channel.Channel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+
+import org.neo4j.driver.internal.BoltServerAddress;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress;
+import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
+
+public class NettyChannelTrackerTest
+{
+ private final BoltServerAddress address = BoltServerAddress.LOCAL_DEFAULT;
+ private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, DEV_NULL_LOGGING );
+
+ @Test
+ public void shouldIncrementInUseCountWhenChannelCreated()
+ {
+ Channel channel = newChannel();
+ assertEquals( 0, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+
+ tracker.channelCreated( channel );
+ assertEquals( 1, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+ }
+
+ @Test
+ public void shouldIncrementInUseCountWhenChannelAcquired()
+ {
+ Channel channel = newChannel();
+ assertEquals( 0, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+
+ tracker.channelCreated( channel );
+ assertEquals( 1, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+
+ tracker.channelReleased( channel );
+ assertEquals( 0, tracker.inUseChannelCount( address ) );
+ assertEquals( 1, tracker.idleChannelCount( address ) );
+
+ tracker.channelAcquired( channel );
+ assertEquals( 1, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+ }
+
+ @Test
+ public void shouldIncrementInuseCountForAddress()
+ {
+ Channel channel1 = newChannel();
+ Channel channel2 = newChannel();
+ Channel channel3 = newChannel();
+
+ assertEquals( 0, tracker.inUseChannelCount( address ) );
+ tracker.channelCreated( channel1 );
+ assertEquals( 1, tracker.inUseChannelCount( address ) );
+ tracker.channelCreated( channel2 );
+ assertEquals( 2, tracker.inUseChannelCount( address ) );
+ tracker.channelCreated( channel3 );
+ assertEquals( 3, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+ }
+
+ @Test
+ public void shouldDecrementCountForAddress()
+ {
+ Channel channel1 = newChannel();
+ Channel channel2 = newChannel();
+ Channel channel3 = newChannel();
+
+ tracker.channelCreated( channel1 );
+ tracker.channelCreated( channel2 );
+ tracker.channelCreated( channel3 );
+ assertEquals( 3, tracker.inUseChannelCount( address ) );
+ assertEquals( 0, tracker.idleChannelCount( address ) );
+
+ tracker.channelReleased( channel1 );
+ assertEquals( 2, tracker.inUseChannelCount( address ) );
+ assertEquals( 1, tracker.idleChannelCount( address ) );
+ tracker.channelReleased( channel2 );
+ assertEquals( 1, tracker.inUseChannelCount( address ) );
+ assertEquals( 2, tracker.idleChannelCount( address ) );
+ tracker.channelReleased( channel3 );
+ assertEquals( 0, tracker.inUseChannelCount( address ) );
+ assertEquals( 3, tracker.idleChannelCount( address ) );
+ }
+
+ @Test
+ public void shouldThrowWhenDecrementingForUnknownAddress()
+ {
+ Channel channel = newChannel();
+
+ try
+ {
+ tracker.channelReleased( channel );
+ fail( "Exception expected" );
+ }
+ catch ( Exception e )
+ {
+ assertThat( e, instanceOf( IllegalStateException.class ) );
+ }
+ }
+
+ @Test
+ public void shouldReturnZeroActiveCountForUnknownAddress()
+ {
+ assertEquals( 0, tracker.inUseChannelCount( address ) );
+ }
+
+ private Channel newChannel()
+ {
+ EmbeddedChannel channel = new EmbeddedChannel();
+ setServerAddress( channel, address );
+ return channel;
+ }
+}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java
index 6b1f39a8bb..aba9e7b478 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java
@@ -85,7 +85,7 @@ public void shouldHandleSingleWriterWithoutActiveConnections()
public void shouldHandleSingleReaderWithActiveConnections()
{
BoltServerAddress address = new BoltServerAddress( "reader", 9999 );
- when( connectionPool.activeConnections( address ) ).thenReturn( 42 );
+ when( connectionPool.inUseConnections( address ) ).thenReturn( 42 );
assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) );
}
@@ -94,7 +94,7 @@ public void shouldHandleSingleReaderWithActiveConnections()
public void shouldHandleSingleWriterWithActiveConnections()
{
BoltServerAddress address = new BoltServerAddress( "writer", 9999 );
- when( connectionPool.activeConnections( address ) ).thenReturn( 24 );
+ when( connectionPool.inUseConnections( address ) ).thenReturn( 24 );
assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) );
}
@@ -106,9 +106,9 @@ public void shouldHandleMultipleReadersWithActiveConnections()
BoltServerAddress address2 = new BoltServerAddress( "reader", 2 );
BoltServerAddress address3 = new BoltServerAddress( "reader", 3 );
- when( connectionPool.activeConnections( address1 ) ).thenReturn( 3 );
- when( connectionPool.activeConnections( address2 ) ).thenReturn( 4 );
- when( connectionPool.activeConnections( address3 ) ).thenReturn( 1 );
+ when( connectionPool.inUseConnections( address1 ) ).thenReturn( 3 );
+ when( connectionPool.inUseConnections( address2 ) ).thenReturn( 4 );
+ when( connectionPool.inUseConnections( address3 ) ).thenReturn( 1 );
assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
}
@@ -121,10 +121,10 @@ public void shouldHandleMultipleWritersWithActiveConnections()
BoltServerAddress address3 = new BoltServerAddress( "writer", 3 );
BoltServerAddress address4 = new BoltServerAddress( "writer", 4 );
- when( connectionPool.activeConnections( address1 ) ).thenReturn( 5 );
- when( connectionPool.activeConnections( address2 ) ).thenReturn( 6 );
- when( connectionPool.activeConnections( address3 ) ).thenReturn( 0 );
- when( connectionPool.activeConnections( address4 ) ).thenReturn( 1 );
+ when( connectionPool.inUseConnections( address1 ) ).thenReturn( 5 );
+ when( connectionPool.inUseConnections( address2 ) ).thenReturn( 6 );
+ when( connectionPool.inUseConnections( address3 ) ).thenReturn( 0 );
+ when( connectionPool.inUseConnections( address4 ) ).thenReturn( 1 );
assertEquals( address3,
strategy.selectWriter( new BoltServerAddress[]{address1, address2, address3, address4} ) );
@@ -182,7 +182,7 @@ public void shouldTraceLogSelectedAddress()
Logger logger = mock( Logger.class );
when( logging.getLog( anyString() ) ).thenReturn( logger );
- when( connectionPool.activeConnections( any( BoltServerAddress.class ) ) ).thenReturn( 42 );
+ when( connectionPool.inUseConnections( any( BoltServerAddress.class ) ) ).thenReturn( 42 );
LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
index d1c85ff462..47fc02b1b6 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
@@ -171,9 +171,9 @@ public void shouldSelectLeastConnectedAddress()
{
ConnectionPool connectionPool = newConnectionPoolMock();
- when( connectionPool.activeConnections( A ) ).thenReturn( 0 );
- when( connectionPool.activeConnections( B ) ).thenReturn( 20 );
- when( connectionPool.activeConnections( C ) ).thenReturn( 0 );
+ when( connectionPool.inUseConnections( A ) ).thenReturn( 0 );
+ when( connectionPool.inUseConnections( B ) ).thenReturn( 20 );
+ when( connectionPool.inUseConnections( C ) ).thenReturn( 0 );
RoutingTable routingTable = mock( RoutingTable.class );
AddressSet readerAddresses = mock( AddressSet.class );
diff --git a/driver/src/test/java/org/neo4j/driver/internal/metrics/InternalHistogramTest.java b/driver/src/test/java/org/neo4j/driver/internal/metrics/InternalHistogramTest.java
new file mode 100644
index 0000000000..66e17ae2cb
--- /dev/null
+++ b/driver/src/test/java/org/neo4j/driver/internal/metrics/InternalHistogramTest.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2002-2018 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.metrics;
+
+import org.junit.Test;
+
+import org.neo4j.driver.internal.metrics.spi.Histogram;
+
+import static java.lang.Math.abs;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertThat;
+
+public class InternalHistogramTest
+{
+ @Test
+ public void shouldRecordSmallValuesPrecisely() throws Throwable
+ {
+ // Given
+ InternalHistogram histogram = new InternalHistogram();
+ histogram.recordValue( 0 );
+ histogram.recordValue( 1 );
+ histogram.recordValue( 2 );
+ histogram.recordValue( 3 );
+ histogram.recordValue( 4 );
+
+ // When
+ assertThat( histogram.min(), equalTo( 0L ) );
+ assertThat( histogram.max(), equalTo( 4L ) );
+ assertThat( histogram.mean(), equalTo( 2.0 ) );
+ assertThat( histogram.totalCount(), equalTo( 5L ) );
+ }
+
+ @Test
+ public void shouldRecordBigValuesPrecisely() throws Throwable
+ {
+ // Given
+ InternalHistogram histogram = new InternalHistogram();
+ histogram.recordValue( 0 );
+ histogram.recordValue( 100000 );
+ histogram.recordValue( 200000 );
+ histogram.recordValue( 300000 );
+ histogram.recordValue( 400000 );
+
+ // When
+ assertThat( histogram.min(), equalTo( 0L ) );
+ assertThat( abs( histogram.max() - 400000L ), lessThan( 500L ) );
+ assertThat( abs( histogram.mean() - 200000.0 ), lessThan( 500.0 ) );
+ assertThat( histogram.totalCount(), equalTo( 5L ) );
+ }
+
+ @Test
+ public void shouldResetOnOriginalHistogram() throws Throwable
+ {
+ // Given
+ InternalHistogram histogram = new InternalHistogram();
+ histogram.recordValue( 0 );
+ histogram.recordValue( 1 );
+ histogram.recordValue( 2 );
+ histogram.recordValue( 3 );
+ histogram.recordValue( 4 );
+
+ // When
+ assertThat( histogram.totalCount(), equalTo( 5L ) );
+ Histogram snapshot = histogram.snapshot();
+ snapshot.reset();
+
+ // Then
+ assertThat( histogram.totalCount(), equalTo( 0L ) );
+ }
+
+}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java
index cacd88711b..7c7980cc9d 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java
@@ -29,6 +29,7 @@
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.ChannelConnector;
+import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.v1.AuthToken;
@@ -70,10 +71,10 @@ protected final ChannelConnector createConnector( ConnectionSettings settings, S
}
@Override
- protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Bootstrap bootstrap, Config config )
+ protected final ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics,
+ Config config )
{
- pool = super.createConnectionPool( authToken, securityPlan, bootstrap, config );
+ pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
return pool;
}
@@ -102,6 +103,6 @@ public List pollChannels()
public int activeChannels( BoltServerAddress address )
{
- return pool == null ? 0 : pool.activeConnections( address );
+ return pool == null ? 0 : pool.inUseConnections( address );
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java
index cb7ec6cba2..cdb4e27305 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java
@@ -27,6 +27,7 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DriverFactory;
+import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -40,10 +41,9 @@ public class FailingConnectionDriverFactory extends DriverFactory
private final AtomicReference nextRunFailure = new AtomicReference<>();
@Override
- protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap,
- Config config )
+ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
{
- ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, config );
+ ConnectionPool pool = super.createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
return new ConnectionPoolWithFailingConnections( pool, nextRunFailure );
}
@@ -77,9 +77,15 @@ public void retainAll( Set addressesToRetain )
}
@Override
- public int activeConnections( BoltServerAddress address )
+ public int inUseConnections( BoltServerAddress address )
{
- return delegate.activeConnections( address );
+ return delegate.inUseConnections( address );
+ }
+
+ @Override
+ public int idleConnections( BoltServerAddress address )
+ {
+ return delegate.idleConnections( address );
}
@Override
@@ -87,6 +93,12 @@ public CompletionStage close()
{
return delegate.close();
}
+
+ @Override
+ public boolean isOpen()
+ {
+ return delegate.isOpen();
+ }
}
private static class FailingConnection implements Connection
diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java
index 93ff0cc96d..f2d61d885b 100644
--- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java
+++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionHandlingIT.java
@@ -36,6 +36,7 @@
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.RoutingSettings;
+import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
@@ -64,6 +65,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.v1.Config.defaultConfig;
import static org.neo4j.driver.v1.Values.parameters;
import static org.neo4j.driver.v1.util.TestUtil.await;
@@ -294,8 +296,7 @@ private static class DriverFactoryWithConnectionPool extends DriverFactory
MemorizingConnectionPool connectionPool;
@Override
- protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Bootstrap bootstrap, Config config )
+ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, MetricsListener metrics, Config config )
{
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 );
PoolSettings poolSettings = new PoolSettings( config.maxConnectionPoolSize(),
@@ -317,7 +318,7 @@ private static class MemorizingConnectionPool extends ConnectionPoolImpl
MemorizingConnectionPool( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
Logging logging, Clock clock )
{
- super( connector, bootstrap, settings, logging, clock );
+ super( connector, bootstrap, settings, DEV_NULL_METRICS, logging, clock );
}
diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java b/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java
index cc2377c3aa..af894c913b 100644
--- a/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java
+++ b/driver/src/test/java/org/neo4j/driver/v1/stress/AbstractStressTestBase.java
@@ -44,6 +44,7 @@
import java.util.function.Function;
import java.util.logging.Level;
+import org.neo4j.driver.internal.InternalDriver;
import org.neo4j.driver.internal.logging.ConsoleLogging;
import org.neo4j.driver.internal.logging.DevNullLogger;
import org.neo4j.driver.internal.util.Futures;
@@ -73,6 +74,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.neo4j.driver.internal.metrics.spi.Metrics.DRIVER_METRICS_ENABLED_KEY;
public abstract class AbstractStressTestBase
{
@@ -87,11 +89,12 @@ public abstract class AbstractStressTestBase
private LoggerNameTrackingLogging logging;
private ExecutorService executor;
- Driver driver;
+ InternalDriver driver;
@Before
public void setUp()
{
+ System.setProperty( DRIVER_METRICS_ENABLED_KEY, "true" );
logging = new LoggerNameTrackingLogging();
Config config = Config.build()
@@ -100,7 +103,8 @@ public void setUp()
.withConnectionAcquisitionTimeout( 1, MINUTES )
.toConfig();
- driver = GraphDatabase.driver( databaseUri(), authToken(), config );
+ driver = (InternalDriver) GraphDatabase.driver( databaseUri(), authToken(), config );
+ System.setProperty( DRIVER_METRICS_ENABLED_KEY, "false" );
ThreadFactory threadFactory = new DaemonThreadFactory( getClass().getSimpleName() + "-worker-" );
executor = Executors.newCachedThreadPool( threadFactory );
@@ -109,10 +113,12 @@ public void setUp()
@After
public void tearDown()
{
+ System.out.println( driver.metrics() );
executor.shutdownNow();
if ( driver != null )
{
driver.close();
+ System.out.println( driver.metrics() );
}
}
diff --git a/pom.xml b/pom.xml
index ad103e574f..ddcd87b37f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,11 @@
netty-handler
4.1.16.Final
+