Skip to content

Commit 9b217c8

Browse files
author
Zhen
committed
Draft of driver metrics
Added Basics of Driver metrics and Connection pool metrics. The driver metrics are enabled with system property `driver.metrics.enabled=true` TODO: Histgram and ConnectionMetrics
1 parent 14ba9d1 commit 9b217c8

29 files changed

+1053
-199
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
3939
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
4040
import org.neo4j.driver.internal.logging.NettyLogging;
41+
import org.neo4j.driver.internal.metrics.DriverMetricsHandler;
42+
import org.neo4j.driver.internal.metrics.InternalAbstractDriverMetrics;
43+
import org.neo4j.driver.internal.metrics.InternalDriverMetrics;
4144
import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic;
4245
import org.neo4j.driver.internal.retry.RetryLogic;
4346
import org.neo4j.driver.internal.retry.RetrySettings;
@@ -56,6 +59,8 @@
5659
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5760

5861
import static java.lang.String.format;
62+
import static org.neo4j.driver.internal.metrics.InternalAbstractDriverMetrics.DEV_NULL_METRICS;
63+
import static org.neo4j.driver.internal.metrics.spi.DriverMetrics.isDriverMetricsEnabled;
5964
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
6065

6166
public class DriverFactory
@@ -77,18 +82,19 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7782
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
7883
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
7984

80-
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config );
85+
InternalAbstractDriverMetrics metrics = createDriverMetrics( config );
86+
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, metrics, config );
8187

8288
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
8389
eventExecutorGroup, securityPlan, retryLogic );
8490

91+
driver.driverMetrics( metrics );
8592
verifyConnectivity( driver, connectionPool, config );
8693

8794
return driver;
8895
}
8996

90-
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
91-
Bootstrap bootstrap, Config config )
97+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Bootstrap bootstrap, DriverMetricsHandler metrics, Config config )
9298
{
9399
Clock clock = createClock();
94100
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
@@ -97,7 +103,19 @@ protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan
97103
config.connectionAcquisitionTimeoutMillis(), config.maxConnectionLifetimeMillis(),
98104
config.idleTimeBeforeConnectionTest()
99105
);
100-
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
106+
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, metrics, config.logging(), clock );
107+
}
108+
109+
protected static InternalAbstractDriverMetrics createDriverMetrics( Config config )
110+
{
111+
if( isDriverMetricsEnabled() )
112+
{
113+
return new InternalDriverMetrics( config );
114+
}
115+
else
116+
{
117+
return DEV_NULL_METRICS;
118+
}
101119
}
102120

103121
protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan,

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.CompletionStage;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323

24+
import org.neo4j.driver.internal.metrics.spi.DriverMetrics;
2425
import org.neo4j.driver.internal.security.SecurityPlan;
2526
import org.neo4j.driver.internal.util.Futures;
2627
import org.neo4j.driver.v1.AccessMode;
@@ -38,6 +39,7 @@ public class InternalDriver implements Driver
3839
private final Logger log;
3940

4041
private AtomicBoolean closed = new AtomicBoolean( false );
42+
private DriverMetrics driverMetrics;
4143

4244
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
4345
{
@@ -144,6 +146,16 @@ private void assertOpen()
144146
}
145147
}
146148

149+
public DriverMetrics driverMetrics()
150+
{
151+
return this.driverMetrics;
152+
}
153+
154+
void driverMetrics( DriverMetrics driverMetrics )
155+
{
156+
this.driverMetrics = driverMetrics;
157+
}
158+
147159
private static RuntimeException driverCloseException()
148160
{
149161
return new IllegalStateException( "This driver instance has already been closed" );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
import org.neo4j.driver.internal.BoltServerAddress;
3737
import org.neo4j.driver.internal.async.ChannelConnector;
3838
import org.neo4j.driver.internal.async.NettyConnection;
39+
import org.neo4j.driver.internal.metrics.DriverMetricsHandler;
40+
import org.neo4j.driver.internal.metrics.ListenerEvent;
41+
import org.neo4j.driver.internal.metrics.SimpleTimerListenerEvent;
3942
import org.neo4j.driver.internal.spi.Connection;
4043
import org.neo4j.driver.internal.spi.ConnectionPool;
4144
import org.neo4j.driver.internal.util.Clock;
@@ -48,29 +51,31 @@ public class ConnectionPoolImpl implements ConnectionPool
4851
{
4952
private final ChannelConnector connector;
5053
private final Bootstrap bootstrap;
51-
private final ActiveChannelTracker activeChannelTracker;
54+
private final NettyChannelTracker nettyChannelTracker;
5255
private final NettyChannelHealthChecker channelHealthChecker;
5356
private final PoolSettings settings;
5457
private final Clock clock;
5558
private final Logger log;
59+
private DriverMetricsHandler driverMetrics;
5660

5761
private final ConcurrentMap<BoltServerAddress,ChannelPool> pools = new ConcurrentHashMap<>();
5862
private final AtomicBoolean closed = new AtomicBoolean();
5963

6064
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
61-
Logging logging, Clock clock )
65+
DriverMetricsHandler metricsHandler, Logging logging, Clock clock )
6266
{
63-
this( connector, bootstrap, new ActiveChannelTracker( logging ), settings, logging, clock );
67+
this( connector, bootstrap, new NettyChannelTracker( metricsHandler, logging ), settings, metricsHandler, logging, clock );
6468
}
6569

66-
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, ActiveChannelTracker activeChannelTracker,
67-
PoolSettings settings, Logging logging, Clock clock )
70+
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker nettyChannelTracker,
71+
PoolSettings settings, DriverMetricsHandler driverMetrics, Logging logging, Clock clock )
6872
{
6973
this.connector = connector;
7074
this.bootstrap = bootstrap;
71-
this.activeChannelTracker = activeChannelTracker;
75+
this.nettyChannelTracker = nettyChannelTracker;
7276
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
7377
this.settings = settings;
78+
this.driverMetrics = driverMetrics;
7479
this.clock = clock;
7580
this.log = logging.getLog( ConnectionPool.class.getSimpleName() );
7681
}
@@ -80,15 +85,26 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
8085
{
8186
log.trace( "Acquiring a connection from pool towards %s", address );
8287

88+
// TODO no need to init if no driver metrics
89+
ListenerEvent acquireEvent = new SimpleTimerListenerEvent();
90+
8391
assertNotClosed();
8492
ChannelPool pool = getOrCreatePool( address );
93+
driverMetrics.beforeAcquiring( address, acquireEvent );
8594
Future<Channel> connectionFuture = pool.acquire();
8695

8796
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
8897
{
89-
processAcquisitionError( error );
90-
assertNotClosed( address, channel, pool );
91-
return new NettyConnection( channel, pool, clock );
98+
try
99+
{
100+
processAcquisitionError( error );
101+
assertNotClosed( address, channel, pool );
102+
return new NettyConnection( channel, pool, clock );
103+
}
104+
finally
105+
{
106+
driverMetrics.afterAcquired( address, acquireEvent );
107+
}
92108
} );
93109
}
94110

@@ -99,7 +115,7 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
99115
{
100116
if ( !addressesToRetain.contains( address ) )
101117
{
102-
int activeChannels = activeChannelTracker.activeChannelCount( address );
118+
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
103119
if ( activeChannels == 0 )
104120
{
105121
// address is not present in updated routing table and has no active connections
@@ -118,9 +134,15 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
118134
}
119135

120136
@Override
121-
public int activeConnections( BoltServerAddress address )
137+
public int inUseConnections( BoltServerAddress address )
138+
{
139+
return nettyChannelTracker.inUseChannelCount( address );
140+
}
141+
142+
@Override
143+
public int idleConnections( BoltServerAddress address )
122144
{
123-
return activeChannelTracker.activeChannelCount( address );
145+
return nettyChannelTracker.idleChannelCount( address );
124146
}
125147

126148
@Override
@@ -150,6 +172,12 @@ public CompletionStage<Void> close()
150172
.thenApply( ignore -> null );
151173
}
152174

175+
@Override
176+
public boolean isOpen()
177+
{
178+
return !closed.get();
179+
}
180+
153181
private ChannelPool getOrCreatePool( BoltServerAddress address )
154182
{
155183
ChannelPool pool = pools.get( address );
@@ -163,13 +191,19 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
163191
pool.close();
164192
return getOrCreatePool( address );
165193
}
194+
else
195+
{
196+
// We added a new pool as a result we add a new metrics for the pool too
197+
driverMetrics.addPoolMetrics( address, this );
198+
}
166199
}
200+
167201
return pool;
168202
}
169203

170204
ChannelPool newPool( BoltServerAddress address )
171205
{
172-
return new NettyChannelPool( address, connector, bootstrap, activeChannelTracker, channelHealthChecker,
206+
return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker,
173207
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );
174208
}
175209

@@ -217,4 +251,10 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Channe
217251
assertNotClosed();
218252
}
219253
}
254+
255+
@Override
256+
public String toString()
257+
{
258+
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
259+
}
220260
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.neo4j.driver.internal.async.pool;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.Channel;
2223
import io.netty.channel.ChannelFuture;
2324
import io.netty.channel.pool.ChannelHealthChecker;
24-
import io.netty.channel.pool.ChannelPoolHandler;
2525
import io.netty.channel.pool.FixedChannelPool;
2626

2727
import org.neo4j.driver.internal.BoltServerAddress;
@@ -42,28 +42,37 @@ public class NettyChannelPool extends FixedChannelPool
4242

4343
private final BoltServerAddress address;
4444
private final ChannelConnector connector;
45+
private final NettyChannelTracker handler;
4546

4647
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap,
47-
ChannelPoolHandler handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis,
48+
NettyChannelTracker handler, ChannelHealthChecker healthCheck, long acquireTimeoutMillis,
4849
int maxConnections )
4950
{
5051
super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
5152
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK );
5253

5354
this.address = requireNonNull( address );
5455
this.connector = requireNonNull( connector );
56+
this.handler = requireNonNull( handler );
5557
}
5658

5759
@Override
5860
protected ChannelFuture connectChannel( Bootstrap bootstrap )
5961
{
62+
handler.channelCreating( address );
6063
ChannelFuture channelFuture = connector.connect( address, bootstrap );
6164
channelFuture.addListener( future ->
6265
{
6366
if ( future.isSuccess() )
6467
{
6568
// notify pool handler about a successful connection
66-
handler().channelCreated( channelFuture.channel() );
69+
Channel channel = channelFuture.channel();
70+
handler.channelCreated( channel );
71+
channel.closeFuture().addListener( closeFuture -> handler.channelClosed( channel ) );
72+
}
73+
else
74+
{
75+
handler.channelFailedToCreate( address );
6776
}
6877
} );
6978
return channelFuture;

0 commit comments

Comments
 (0)