Skip to content

Commit a1aba73

Browse files
Fix eviction logic in the PooledChannelCF
The method `destroyObject()` is called by the eviction process of Apache Pool2, but it tries to do a logical close that puts the channel back on the pool causing a `java.lang.IllegalStateException: Returned object not currently part of this pool and object lost (abandoned).` * Extract the `targetChannel` from the proxy and call a `physicalClose()` in the `destroyObject()` for a proper pool eviction **Cherry-pick to `2.4.x` & `2.3.x`**
1 parent 909ba57 commit a1aba73

File tree

3 files changed

+79
-3
lines changed

3 files changed

+79
-3
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ChannelProxy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.amqp.rabbit.connection;
1818

19+
import org.springframework.aop.RawTargetAccess;
20+
1921
import com.rabbitmq.client.Channel;
2022

2123
/**
@@ -24,9 +26,10 @@
2426
*
2527
* @author Mark Pollack
2628
* @author Gary Russell
29+
* @author Leonardo Ferreira
2730
* @see CachingConnectionFactory
2831
*/
29-
public interface ChannelProxy extends Channel {
32+
public interface ChannelProxy extends Channel, RawTargetAccess {
3033

3134
/**
3235
* Return the target Channel of this proxy.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,12 @@ public PooledObject<Channel> makeObject() {
292292

293293
@Override
294294
public void destroyObject(PooledObject<Channel> p) throws Exception {
295-
p.getObject().close();
295+
Channel channel = p.getObject();
296+
if (channel instanceof ChannelProxy) {
297+
channel = ((ChannelProxy) channel).getTargetChannel();
298+
}
299+
300+
ConnectionWrapper.this.physicalClose(channel);
296301
}
297302

298303
@Override

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactoryTests.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.concurrent.atomic.AtomicBoolean;
2224
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicReference;
2326

27+
import org.apache.commons.pool2.impl.GenericObjectPool;
2428
import org.junit.jupiter.api.Test;
2529

2630
import org.springframework.amqp.core.Queue;
@@ -199,6 +203,70 @@ private void createAndCloseConnectionChannelTxAndChannelNonTx(
199203
connection.close();
200204
}
201205

206+
@Test
207+
public void evictShouldCloseAllUnneededChannelsWithoutErrors() throws Exception {
208+
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(new ConnectionFactory());
209+
AtomicReference<GenericObjectPool<Channel>> channelsReference = new AtomicReference<>();
210+
AtomicReference<GenericObjectPool<Channel>> txChannelsReference = new AtomicReference<>();
211+
AtomicInteger swallowedExceptionsCount = new AtomicInteger();
212+
pcf.setPoolConfigurer((pool, tx) -> {
213+
if (tx) {
214+
channelsReference.set(pool);
215+
}
216+
else {
217+
txChannelsReference.set(pool);
218+
}
219+
220+
pool.setEvictionPolicy((ec, u, idleCount) -> idleCount > ec.getMinIdle());
221+
pool.setSwallowedExceptionListener(ex -> swallowedExceptionsCount.incrementAndGet());
222+
pool.setNumTestsPerEvictionRun(5);
223+
224+
pool.setMinIdle(1);
225+
pool.setMaxIdle(5);
226+
});
227+
228+
createAndCloseFiveChannelTxAndChannelNonTx(pcf);
229+
230+
final GenericObjectPool<Channel> channels = channelsReference.get();
231+
channels.evict();
232+
233+
assertThat(channels.getNumIdle())
234+
.isEqualTo(1);
235+
assertThat(channels.getDestroyedByEvictorCount())
236+
.isEqualTo(4);
237+
238+
final GenericObjectPool<Channel> txChannels = txChannelsReference.get();
239+
txChannels.evict();
240+
assertThat(txChannels.getNumIdle())
241+
.isEqualTo(1);
242+
assertThat(txChannels.getDestroyedByEvictorCount())
243+
.isEqualTo(4);
244+
245+
assertThat(swallowedExceptionsCount.get())
246+
.isZero();
247+
}
248+
249+
private void createAndCloseFiveChannelTxAndChannelNonTx(
250+
org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
251+
int channelAmount = 5;
252+
Connection connection = connectionFactory.createConnection();
253+
254+
List<Channel> channels = new ArrayList<>(channelAmount);
255+
List<Channel> txChannels = new ArrayList<>(channelAmount);
256+
257+
for (int i = 0; i < channelAmount; i++) {
258+
channels.add(connection.createChannel(false));
259+
txChannels.add(connection.createChannel(true));
260+
}
261+
262+
for (int i = 0; i < channelAmount; i++) {
263+
RabbitUtils.closeChannel(channels.get(i));
264+
RabbitUtils.closeChannel(txChannels.get(i));
265+
}
266+
267+
connection.close();
268+
}
269+
202270
@Configuration
203271
public static class Config {
204272

0 commit comments

Comments
 (0)