Skip to content

Commit eb89a5e

Browse files
author
Zhen
committed
Added acquiring to describe how many application threads are waiting for a connection from the pool.
Fixed the bug where the pool status was not showing the correct status of each small pool.
1 parent 16a1b06 commit eb89a5e

File tree

9 files changed

+74
-26
lines changed

9 files changed

+74
-26
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,19 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
9393

9494
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
9595
{
96-
processAcquisitionError( address, error );
97-
assertNotClosed( address, channel, pool );
98-
NettyConnection nettyConnection = new NettyConnection( channel, pool, clock, metricsListener );
96+
try
97+
{
98+
processAcquisitionError( address, error );
99+
assertNotClosed( address, channel, pool );
100+
NettyConnection nettyConnection = new NettyConnection( channel, pool, clock, metricsListener );
99101

100-
metricsListener.afterAcquiredOrCreated( address, acquireEvent );
101-
return nettyConnection;
102+
metricsListener.afterAcquiredOrCreated( address, acquireEvent );
103+
return nettyConnection;
104+
}
105+
finally
106+
{
107+
metricsListener.afterAcquiringOrCreating( address );
108+
}
102109
} );
103110
}
104111

@@ -167,9 +174,9 @@ public CompletionStage<Void> close()
167174
}
168175

169176
@Override
170-
public boolean isOpen()
177+
public boolean isOpen( BoltServerAddress address )
171178
{
172-
return !closed.get();
179+
return pools.containsKey( address );
173180
}
174181

175182
private ChannelPool getOrCreatePool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public interface ConnectionPoolMetricsListener
3232

3333
void beforeAcquiringOrCreating( PoolListenerEvent listenerEvent );
3434

35+
void afterAcquiringOrCreating();
36+
3537
void afterAcquiredOrCreated( PoolListenerEvent listenerEvent );
3638

3739
void afterTimedOutToAcquireOrCreate();

driver/src/main/java/org/neo4j/driver/internal/metrics/InternalAbstractMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, Listener
6868

6969
}
7070

71+
@Override
72+
public void afterAcquiringOrCreating( BoltServerAddress serverAddress )
73+
{
74+
75+
}
76+
7177
@Override
7278
public void afterAcquiredOrCreated( BoltServerAddress serverAddress, ListenerEvent.PoolListenerEvent acquireEvent )
7379
{

driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class InternalConnectionPoolMetrics implements ConnectionPoolMetrics, Con
4545
private final AtomicLong created = new AtomicLong();
4646
private final AtomicLong failedToCreate = new AtomicLong();
4747

48+
private final AtomicInteger acquiring = new AtomicInteger();
4849
private final AtomicLong acquired = new AtomicLong();
4950
private final AtomicLong timedOutToAcquire = new AtomicLong();
5051

@@ -90,6 +91,13 @@ public void afterClosed()
9091
public void beforeAcquiringOrCreating( PoolListenerEvent listenerEvent )
9192
{
9293
listenerEvent.start();
94+
acquiring.incrementAndGet();
95+
}
96+
97+
@Override
98+
public void afterAcquiringOrCreating()
99+
{
100+
acquiring.decrementAndGet();
93101
}
94102

95103
@Override
@@ -116,7 +124,7 @@ public String uniqueName()
116124
@Override
117125
public PoolStatus poolStatus()
118126
{
119-
if ( pool.isOpen() )
127+
if ( pool.isOpen( address ) )
120128
{
121129
return PoolStatus.Open;
122130
}
@@ -168,6 +176,12 @@ public long closed()
168176
return closed.get();
169177
}
170178

179+
@Override
180+
public int acquiring()
181+
{
182+
return acquiring.get();
183+
}
184+
171185
@Override
172186
public long acquired()
173187
{
@@ -183,8 +197,9 @@ public Histogram acquisitionTimeHistogram()
183197
@Override
184198
public String toString()
185199
{
186-
return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s, acquired=%s, " +
200+
return format( "[created=%s, closed=%s, creating=%s, failedToCreate=%s, acquiring=%s, acquired=%s, " +
187201
"timedOutToAcquire=%s, inUse=%s, idle=%s, poolStatus=%s, acquisitionTimeHistogram=%s]",
188-
created(), closed(), creating(), failedToCreate(), acquired(), timedOutToAcquire(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
202+
created(), closed(), creating(), failedToCreate(), acquiring(), acquired(),
203+
timedOutToAcquire(), inUse(), idle(), poolStatus(), acquisitionTimeHistogram() );
189204
}
190205
}

driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ public void beforeAcquiringOrCreating( BoltServerAddress serverAddress, PoolList
8787
poolMetrics( serverAddress ).beforeAcquiringOrCreating( listenerEvent );
8888
}
8989

90+
@Override
91+
public void afterAcquiringOrCreating( BoltServerAddress serverAddress )
92+
{
93+
poolMetrics( serverAddress ).afterAcquiringOrCreating();
94+
}
95+
9096
@Override
9197
public void afterAcquiredOrCreated( BoltServerAddress serverAddress, PoolListenerEvent listenerEvent )
9298
{

driver/src/main/java/org/neo4j/driver/internal/metrics/MetricsListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public interface MetricsListener
6868
*/
6969
void beforeAcquiringOrCreating( BoltServerAddress serverAddress, PoolListenerEvent acquireEvent );
7070

71+
/**
72+
* After acquiring or creating a new netty channel from pool regardless successfully or not.
73+
* @param serverAddress the server the netty channel binds to
74+
*/
75+
void afterAcquiringOrCreating( BoltServerAddress serverAddress );
76+
7177
/**
7278
* After acquiring or creating a new netty channel from pool successfully.
7379
* @param serverAddress the server the netty channel binds to

driver/src/main/java/org/neo4j/driver/internal/metrics/spi/ConnectionPoolMetrics.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,27 @@ public interface ConnectionPoolMetrics
3131
String uniqueName();
3232

3333
/**
34-
* The status of the pool
34+
* The status of the pool.
3535
* @return The status of the pool.
3636
*/
3737
PoolStatus poolStatus();
3838

3939
/**
40-
* The amount of channels that are in-use (borrowed out of the pool).
41-
* The number is changing up and down from time to time.
42-
* @return The amount of channels that are in-use
40+
* The amount of channels that are currently in-use (borrowed out of the pool).
41+
* @return The amount of channels that are currently in-use
4342
*/
4443
int inUse();
4544

4645
/**
47-
* The amount of channels that are idle (buffered inside the pool).
48-
* The number is changing up and down from time to time.
49-
* @return The amount of channels that are idle.
46+
* The amount of channels that are currently idle (buffered inside the pool).
47+
* @return The amount of channels that are currently idle.
5048
*/
5149
int idle();
5250

5351
/**
54-
* The amount of channels that are waiting to be created.
55-
* The amount is increased by one when the pool noticed a request to create a new connection.
56-
* The amount is decreased by one when the pool noticed a new connection is created regardless successfully or not.
57-
* The number is changing up and down from time to time.
52+
* The amount of channels that are currently waiting to be created.
53+
* The amount is increased by one when the pool noticed a request to create a new channel.
54+
* The amount is decreased by one when the pool noticed a new channel is created successfully or failed to create.
5855
* @return The amount of channels that are waiting to be created.
5956
*/
6057
int creating();
@@ -78,21 +75,30 @@ public interface ConnectionPoolMetrics
7875
long closed();
7976

8077
/**
81-
* An increasing-only number to record how many connections have been acquired from the pool.
78+
* The current count of application requests to wait for acquiring a connection from the pool.
79+
* The reason to wait could be waiting for creating a new channel, or waiting for a channel to be free by application when the pool is full.
80+
* @return The current amount of application request to wait for acquiring a connection from the pool.
81+
*/
82+
int acquiring();
83+
84+
/**
85+
* An increasing-only number to record how many connections have been acquired from the pool
86+
* The connections acquired could hold either a newly created channel or a reused channel from the pool.
8287
* @return The amount of connections that have been acquired from the pool.
8388
*/
8489
long acquired();
8590

8691
/**
8792
* An increasing-only number to record how many times that we've failed to acquire a connection from the pool within configured maximum acquisition timeout
8893
* set by {@link Config.ConfigBuilder#withConnectionAcquisitionTimeout(long, TimeUnit)}.
94+
* The connection acquired could hold either a newly created channel or a reused channel from the pool.
8995
* @return The amount of failures to acquire a connection from the pool within maximum connection acquisition timeout.
9096
*/
9197
long timedOutToAcquire();
9298

9399
/**
94100
* An acquisition time histogram records how long it takes to acquire an connection from this pool.
95-
* The connection acquired from the pool could either be a connection idling inside the pool or a connection created by the pool.
101+
* The connection acquired from the pool could contain either a channel idling inside the pool or a channel created by the pool.
96102
* @return The acquisition time histogram.
97103
*/
98104
Histogram acquisitionTimeHistogram();

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ public interface ConnectionPool
3535

3636
CompletionStage<Void> close();
3737

38-
boolean isOpen();
38+
boolean isOpen( BoltServerAddress address );
3939
}

driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ public CompletionStage<Void> close()
9595
}
9696

9797
@Override
98-
public boolean isOpen()
98+
public boolean isOpen( BoltServerAddress address )
9999
{
100-
return delegate.isOpen();
100+
return delegate.isOpen( address );
101101
}
102102
}
103103

0 commit comments

Comments
 (0)