Skip to content

Commit 5418cb7

Browse files
author
Zhen Li
committed
Update netty version to the latest.
In the updated netty pool, `ChannelPoolHandler#channelCreated` is called at channel creation, and then `ChannelPoolHandler#channelAcquired` is called when the connection is borrowed out of the pool. When acquiring a connection from a newly created pool, a new connection will be created and then acquired. In previous netty version, when a connection is created, then it is directly lent out of the pool without being acquired. Thus we need to change some logic regarding how to count in use and idle connections.
1 parent 4856b72 commit 5418cb7

File tree

5 files changed

+42
-23
lines changed

5 files changed

+42
-23
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,8 @@ public void channelCreated( Channel channel, ListenerEvent creatingEvent )
9191
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s",
9292
channel.id(), channel.localAddress(), channel.remoteAddress() );
9393

94-
incrementInUse( channel );
94+
incrementIdle( channel ); // when it is created, we count it as idle as it has not been acquired out of the pool
9595
metricsListener.afterCreated( poolId( channel ), creatingEvent );
96-
9796
allChannels.add( channel );
9897
}
9998

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ void shouldFailToAcquireConnectionWhenPoolIsClosed()
123123
assertThrows( ServiceUnavailableException.class, () -> await( pool.acquire( neo4j.address() ) ) );
124124
assertThat( error.getMessage(), containsString( "closed while acquiring a connection" ) );
125125
assertThat( error.getCause(), instanceOf( IllegalStateException.class ) );
126-
assertThat( error.getCause().getMessage(), containsString( "FixedChannelPooled was closed" ) );
126+
assertThat( error.getCause().getMessage(), containsString( "FixedChannelPool was closed" ) );
127127
}
128128

129129
private ConnectionPoolImpl newPool() throws Exception

driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelTrackerTest.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ class NettyChannelTrackerTest
4949
private final NettyChannelTracker tracker = new NettyChannelTracker( DEV_NULL_METRICS, mock( ChannelGroup.class ), DEV_NULL_LOGGING );
5050

5151
@Test
52-
void shouldIncrementInUseCountWhenChannelCreated()
52+
void shouldIncrementIdleCountWhenChannelCreated()
5353
{
5454
Channel channel = newChannel();
5555
assertEquals( 0, tracker.inUseChannelCount( address ) );
5656
assertEquals( 0, tracker.idleChannelCount( address ) );
5757

5858
tracker.channelCreated( channel, null );
59-
assertEquals( 1, tracker.inUseChannelCount( address ) );
60-
assertEquals( 0, tracker.idleChannelCount( address ) );
59+
assertEquals( 0, tracker.inUseChannelCount( address ) );
60+
assertEquals( 1, tracker.idleChannelCount( address ) );
6161
}
6262

6363
@Test
@@ -68,33 +68,45 @@ void shouldIncrementInUseCountWhenChannelAcquired()
6868
assertEquals( 0, tracker.idleChannelCount( address ) );
6969

7070
tracker.channelCreated( channel, null );
71+
assertEquals( 0, tracker.inUseChannelCount( address ) );
72+
assertEquals( 1, tracker.idleChannelCount( address ) );
73+
74+
tracker.channelAcquired( channel );
7175
assertEquals( 1, tracker.inUseChannelCount( address ) );
7276
assertEquals( 0, tracker.idleChannelCount( address ) );
77+
}
7378

74-
tracker.channelReleased( channel );
79+
@Test
80+
void shouldIncrementIdleCountWhenChannelReleased()
81+
{
82+
Channel channel = newChannel();
7583
assertEquals( 0, tracker.inUseChannelCount( address ) );
76-
assertEquals( 1, tracker.idleChannelCount( address ) );
84+
assertEquals( 0, tracker.idleChannelCount( address ) );
7785

78-
tracker.channelAcquired( channel );
86+
channelCreatedAndAcquired( channel );
7987
assertEquals( 1, tracker.inUseChannelCount( address ) );
8088
assertEquals( 0, tracker.idleChannelCount( address ) );
89+
90+
tracker.channelReleased( channel );
91+
assertEquals( 0, tracker.inUseChannelCount( address ) );
92+
assertEquals( 1, tracker.idleChannelCount( address ) );
8193
}
8294

8395
@Test
84-
void shouldIncrementInuseCountForAddress()
96+
void shouldIncrementIdleCountForAddress()
8597
{
8698
Channel channel1 = newChannel();
8799
Channel channel2 = newChannel();
88100
Channel channel3 = newChannel();
89101

90-
assertEquals( 0, tracker.inUseChannelCount( address ) );
102+
assertEquals( 0, tracker.idleChannelCount( address ) );
91103
tracker.channelCreated( channel1, null );
92-
assertEquals( 1, tracker.inUseChannelCount( address ) );
104+
assertEquals( 1, tracker.idleChannelCount( address ) );
93105
tracker.channelCreated( channel2, null );
94-
assertEquals( 2, tracker.inUseChannelCount( address ) );
106+
assertEquals( 2, tracker.idleChannelCount( address ) );
95107
tracker.channelCreated( channel3, null );
96-
assertEquals( 3, tracker.inUseChannelCount( address ) );
97-
assertEquals( 0, tracker.idleChannelCount( address ) );
108+
assertEquals( 3, tracker.idleChannelCount( address ) );
109+
assertEquals( 0, tracker.inUseChannelCount( address ) );
98110
}
99111

100112
@Test
@@ -104,9 +116,9 @@ void shouldDecrementCountForAddress()
104116
Channel channel2 = newChannel();
105117
Channel channel3 = newChannel();
106118

107-
tracker.channelCreated( channel1, null );
108-
tracker.channelCreated( channel2, null );
109-
tracker.channelCreated( channel3, null );
119+
channelCreatedAndAcquired( channel1 );
120+
channelCreatedAndAcquired( channel2 );
121+
channelCreatedAndAcquired( channel3 );
110122
assertEquals( 3, tracker.inUseChannelCount( address ) );
111123
assertEquals( 0, tracker.idleChannelCount( address ) );
112124

@@ -126,7 +138,7 @@ void shouldDecreaseIdleWhenClosedOutsidePool() throws Throwable
126138
{
127139
// Given
128140
Channel channel = newChannel();
129-
tracker.channelCreated( channel, null );
141+
channelCreatedAndAcquired( channel );
130142
assertEquals( 1, tracker.inUseChannelCount( address ) );
131143
assertEquals( 0, tracker.idleChannelCount( address ) );
132144

@@ -147,7 +159,7 @@ void shouldDecreaseIdleWhenClosedInsidePool() throws Throwable
147159
{
148160
// Given
149161
Channel channel = newChannel();
150-
tracker.channelCreated( channel, null );
162+
channelCreatedAndAcquired( channel );
151163
assertEquals( 1, tracker.inUseChannelCount( address ) );
152164
assertEquals( 0, tracker.idleChannelCount( address ) );
153165

@@ -160,7 +172,6 @@ void shouldDecreaseIdleWhenClosedInsidePool() throws Throwable
160172
// Then
161173
assertEquals( 0, tracker.inUseChannelCount( address ) );
162174
assertEquals( 0, tracker.idleChannelCount( address ) );
163-
164175
}
165176

166177
@Test
@@ -226,4 +237,10 @@ private EmbeddedChannel newChannelWithProtocolV3()
226237
setMessageDispatcher( channel, mock( InboundMessageDispatcher.class ) );
227238
return channel;
228239
}
240+
241+
private void channelCreatedAndAcquired( Channel channel )
242+
{
243+
tracker.channelCreated( channel, null );
244+
tracker.channelAcquired( channel );
245+
}
229246
}

driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,10 @@ private static class PooledConnection implements Connection
407407
this.channel = channel;
408408
this.pool = pool;
409409

410-
this.channel.attr( AttributeKey.valueOf( "channelPool" ) ).setIfAbsent( pool );
410+
// This is needed to make netty connection pool to believe this channel is created by the pool.
411+
// Otherwise the netty connection pool will refuse to release the channel back to the pool.
412+
AttributeKey<ExtendedChannelPool> poolKey = AttributeKey.valueOf( "channelPool." + System.identityHashCode( pool ) );
413+
this.channel.attr( poolKey ).setIfAbsent( pool );
411414
}
412415

413416
@Override

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
<dependency>
6565
<groupId>io.netty</groupId>
6666
<artifactId>netty-handler</artifactId>
67-
<version>4.1.22.Final</version>
67+
<version>4.1.41.Final</version>
6868
</dependency>
6969
<dependency>
7070
<groupId>io.projectreactor</groupId>

0 commit comments

Comments
 (0)