Skip to content

Commit 82e8c16

Browse files
authored
Merge pull request #471 from zhenlineo/1.6-metrics-enhance
Metrics Enhancement
2 parents be4cd1d + c889fa1 commit 82e8c16

19 files changed

+217
-105
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock, M
7070
this.clock = clock;
7171
this.metricsListener = metricsListener;
7272
this.inUseEvent = metricsListener.createListenerEvent();
73-
metricsListener.afterAcquiredOrCreated( this.serverAddress, this.inUseEvent );
73+
metricsListener.afterConnectionCreated( this.serverAddress, this.inUseEvent );
7474
}
7575

7676
@Override
@@ -131,11 +131,11 @@ public CompletionStage<Void> release()
131131
{
132132
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
133133
{
134-
metricsListener.afterReleased( this.serverAddress, this.inUseEvent );
135134
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
136135
channelPool, messageDispatcher, clock, releaseFuture );
137136

138137
writeResetMessageIfNeeded( handler, false );
138+
metricsListener.afterConnectionReleased( this.serverAddress, this.inUseEvent );
139139
}
140140
return releaseFuture;
141141
}
@@ -145,11 +145,11 @@ public void terminateAndRelease( String reason )
145145
{
146146
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
147147
{
148-
metricsListener.afterReleased( this.serverAddress, this.inUseEvent );
149148
setTerminationReason( channel, reason );
150149
channel.close();
151150
channelPool.release( channel );
152151
releaseFuture.complete( null );
152+
metricsListener.afterConnectionReleased( this.serverAddress, this.inUseEvent );
153153
}
154154
}
155155

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,16 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
9595
{
9696
try
9797
{
98-
processAcquisitionError( error );
98+
processAcquisitionError( address, error );
9999
assertNotClosed( address, channel, pool );
100-
return new NettyConnection( channel, pool, clock, metricsListener );
100+
NettyConnection nettyConnection = new NettyConnection( channel, pool, clock, metricsListener );
101+
102+
metricsListener.afterAcquiredOrCreated( address, acquireEvent );
103+
return nettyConnection;
101104
}
102105
finally
103106
{
104-
metricsListener.afterAcquiringOrCreating( address, acquireEvent );
107+
metricsListener.afterAcquiringOrCreating( address );
105108
}
106109
} );
107110
}
@@ -171,9 +174,9 @@ public CompletionStage<Void> close()
171174
}
172175

173176
@Override
174-
public boolean isOpen()
177+
public boolean isOpen( BoltServerAddress address )
175178
{
176-
return !closed.get();
179+
return pools.containsKey( address );
177180
}
178181

179182
private ChannelPool getOrCreatePool( BoltServerAddress address )
@@ -210,7 +213,7 @@ private EventLoopGroup eventLoopGroup()
210213
return bootstrap.config().group();
211214
}
212215

213-
private void processAcquisitionError( Throwable error )
216+
private void processAcquisitionError( BoltServerAddress serverAddress, Throwable error )
214217
{
215218
Throwable cause = Futures.completionExceptionCause( error );
216219
if ( cause != null )
@@ -219,6 +222,7 @@ private void processAcquisitionError( Throwable error )
219222
{
220223
// NettyChannelPool returns future failed with TimeoutException if acquire operation takes more than
221224
// configured time, translate this exception to a prettier one and re-throw
225+
metricsListener.afterTimedOutToAcquireOrCreate( serverAddress );
222226
throw new ClientException(
223227
"Unable to acquire connection from the pool within configured maximum time of " +
224228
settings.connectionAcquisitionTimeout() + "ms" );

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,21 @@ public NettyChannelPool( BoltServerAddress address, ChannelConnector connector,
6060
@Override
6161
protected ChannelFuture connectChannel( Bootstrap bootstrap )
6262
{
63-
ListenerEvent creatingEvent = handler.beforeChannelCreating( address );
63+
ListenerEvent creatingEvent = handler.channelCreating( address );
6464
ChannelFuture channelFuture = connector.connect( address, bootstrap );
6565
channelFuture.addListener( future ->
6666
{
6767
if ( future.isSuccess() )
6868
{
6969
// notify pool handler about a successful connection
7070
Channel channel = channelFuture.channel();
71-
handler.channelCreated( channel );
71+
handler.channelCreated( channel, creatingEvent );
7272
channel.closeFuture().addListener( closeFuture -> handler.channelClosed( channel ) );
7373
}
7474
else
7575
{
7676
handler.channelFailedToCreate( address );
7777
}
78-
handler.afterChannelCreating( address, creatingEvent );
7978
} );
8079
return channelFuture;
8180
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,26 +65,26 @@ public void channelAcquired( Channel channel )
6565
@Override
6666
public void channelCreated( Channel channel )
6767
{
68-
log.debug( "Channel %s created", channel );
69-
incrementInUse( channel );
70-
metricsListener.afterCreated( serverAddress( channel ) );
68+
throw new IllegalStateException( "Untraceable channel created." );
7169
}
7270

73-
public void channelFailedToCreate( BoltServerAddress address )
71+
public void channelCreated( Channel channel, ListenerEvent creatingEvent )
7472
{
75-
metricsListener.afterFailedToCreate( address );
73+
log.debug( "Channel %s created", channel );
74+
incrementInUse( channel );
75+
metricsListener.afterCreated( serverAddress( channel ), creatingEvent );
7676
}
7777

78-
public ListenerEvent beforeChannelCreating( BoltServerAddress address )
78+
public ListenerEvent channelCreating( BoltServerAddress address )
7979
{
8080
ListenerEvent creatingEvent = metricsListener.createListenerEvent();
8181
metricsListener.beforeCreating( address, creatingEvent );
8282
return creatingEvent;
8383
}
8484

85-
public void afterChannelCreating( BoltServerAddress address, ListenerEvent creatingEvent )
85+
public void channelFailedToCreate( BoltServerAddress address )
8686
{
87-
metricsListener.afterCreating( address, creatingEvent );
87+
metricsListener.afterFailedToCreate( address );
8888
}
8989

9090
public void channelClosed( Channel channel )

driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionMetricsListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public interface ConnectionMetricsListener
2323
{
2424
void beforeCreating( ListenerEvent listenerEvent );
2525

26-
void afterCreating( ListenerEvent listenerEvent );
26+
void afterCreated( ListenerEvent listenerEvent );
2727

2828
void acquiredOrCreated( ListenerEvent listenerEvent );
2929

driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public interface ConnectionPoolMetricsListener
3030

3131
void beforeAcquiringOrCreating( ListenerEvent listenerEvent );
3232

33-
void afterAcquiringOrCreating( ListenerEvent listenerEvent );
33+
void afterAcquiringOrCreating();
34+
35+
void afterAcquiredOrCreated( ListenerEvent listenerEvent );
36+
37+
void afterTimedOutToAcquireOrCreate();
3438
}
3539

driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,46 +35,61 @@ public abstract class InternalAbstractMetrics implements Metrics, MetricsListene
3535
@Override
3636
public void beforeCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
3737
{
38+
3839
}
3940

4041
@Override
41-
public void afterCreating( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
42+
public void afterCreated( BoltServerAddress serverAddress, ListenerEvent creatingEvent )
4243
{
44+
4345
}
4446

4547
@Override
46-
public void afterCreated( BoltServerAddress serverAddress )
48+
public void afterFailedToCreate( BoltServerAddress serverAddress )
4749
{
50+
4851
}
4952

5053
@Override
51-
public void afterFailedToCreate( BoltServerAddress serverAddress )
54+
public void afterClosed( BoltServerAddress serverAddress )
5255
{
56+
5357
}
5458

5559
@Override
56-
public void afterClosed( BoltServerAddress serverAddress )
60+
public void afterTimedOutToAcquireOrCreate( BoltServerAddress serverAddress )
5761
{
62+
5863
}
5964

6065
@Override
6166
public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
6267
{
68+
6369
}
6470

6571
@Override
66-
public void afterAcquiringOrCreating( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
72+
public void afterAcquiringOrCreating( BoltServerAddress serverAddress )
6773
{
74+
6875
}
6976

7077
@Override
71-
public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
78+
public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent acquireEvent )
7279
{
80+
7381
}
7482

7583
@Override
76-
public void afterReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
84+
public void afterConnectionCreated( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
7785
{
86+
87+
}
88+
89+
@Override
90+
public void afterConnectionReleased( BoltServerAddress serverAddress, ListenerEvent inUseEvent )
91+
{
92+
7893
}
7994

8095
@Override
@@ -83,7 +98,6 @@ public ListenerEvent createListenerEvent()
8398
return null;
8499
}
85100

86-
87101
@Override
88102
public void addMetrics( BoltServerAddress address, ConnectionPoolImpl connectionPool )
89103
{

driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void beforeCreating( ListenerEvent connEvent )
6666
}
6767

6868
@Override
69-
public void afterCreating( ListenerEvent connEvent )
69+
public void afterCreated( ListenerEvent connEvent )
7070
{
7171
// finished conn creation
7272
long elapsed = connEvent.elapsed();

driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,15 @@ public class InternalConnectionPoolMetrics implements ConnectionPoolMetrics, Con
3838
private final BoltServerAddress address;
3939
private final ConnectionPool pool;
4040

41-
private AtomicLong created = new AtomicLong();
42-
private AtomicLong closed = new AtomicLong();
43-
private AtomicInteger creating = new AtomicInteger();
44-
private AtomicLong failedToCreate = new AtomicLong();
41+
private final AtomicLong closed = new AtomicLong();
42+
43+
private final AtomicInteger creating = new AtomicInteger();
44+
private final AtomicLong created = new AtomicLong();
45+
private final AtomicLong failedToCreate = new AtomicLong();
46+
47+
private final AtomicInteger acquiring = new AtomicInteger();
48+
private final AtomicLong acquired = new AtomicLong();
49+
private final AtomicLong timedOutToAcquire = new AtomicLong();
4550

4651
private InternalHistogram acquisitionTimeHistogram;
4752

@@ -85,13 +90,28 @@ public void afterClosed()
8590
public void beforeAcquiringOrCreating( ListenerEvent listenerEvent )
8691
{
8792
listenerEvent.start();
93+
acquiring.incrementAndGet();
94+
}
95+
96+
@Override
97+
public void afterAcquiringOrCreating()
98+
{
99+
acquiring.decrementAndGet();
88100
}
89101

90102
@Override
91-
public void afterAcquiringOrCreating( ListenerEvent listenerEvent )
103+
public void afterAcquiredOrCreated( ListenerEvent listenerEvent )
92104
{
93105
long elapsed = listenerEvent.elapsed();
94106
acquisitionTimeHistogram.recordValue( elapsed );
107+
108+
this.acquired.incrementAndGet();
109+
}
110+
111+
@Override
112+
public void afterTimedOutToAcquireOrCreate()
113+
{
114+
this.timedOutToAcquire.incrementAndGet();
95115
}
96116

97117
@Override
@@ -103,13 +123,13 @@ public String uniqueName()
103123
@Override
104124
public PoolStatus poolStatus()
105125
{
106-
if ( pool.isOpen() )
126+
if ( pool.isOpen( address ) )
107127
{
108-
return PoolStatus.Open;
128+
return PoolStatus.OPEN;
109129
}
110130
else
111131
{
112-
return PoolStatus.Closed;
132+
return PoolStatus.CLOSED;
113133
}
114134
}
115135

@@ -143,12 +163,30 @@ public long failedToCreate()
143163
return failedToCreate.get();
144164
}
145165

166+
@Override
167+
public long timedOutToAcquire()
168+
{
169+
return timedOutToAcquire.get();
170+
}
171+
146172
@Override
147173
public long closed()
148174
{
149175
return closed.get();
150176
}
151177

178+
@Override
179+
public int acquiring()
180+
{
181+
return acquiring.get();
182+
}
183+
184+
@Override
185+
public long acquired()
186+
{
187+
return this.acquired.get();
188+
}
189+
152190
@Override
153191
public Histogram acquisitionTimeHistogram()
154192
{
@@ -158,7 +196,9 @@ public Histogram acquisitionTimeHistogram()
158196
@Override
159197
public String toString()
160198
{
161-
return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]", created(),
162-
closed(), creating(), failedToCreate(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
199+
return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s, acquiring=%s, acquired=%s, " +
200+
"timedOutToAcquire=%s, inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]",
201+
created(), closed(), creating(), failedToCreate(), acquiring(), acquired(),
202+
timedOutToAcquire(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
163203
}
164204
}

0 commit comments

Comments
 (0)