Skip to content

Commit ab0c62d

Browse files
Merge pull request #1057 from sergio91pt/confirm-select-once-per-channel
Do not confirmSelect more than once per channel
2 parents 9f0dd42 + f5c26a7 commit ab0c62d

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8282
private final SortedSet<Long> unconfirmedSet =
8383
Collections.synchronizedSortedSet(new TreeSet<Long>());
8484

85+
/** Whether the confirm select method has been successfully activated */
86+
private boolean confirmSelectActivated = false;
87+
8588
/** Whether any nacks have been received since the last waitForConfirms(). */
8689
private volatile boolean onlyAcksReceived = true;
8790

@@ -1553,10 +1556,16 @@ public Tx.RollbackOk txRollback()
15531556
public Confirm.SelectOk confirmSelect()
15541557
throws IOException
15551558
{
1559+
if (confirmSelectActivated) {
1560+
return new Confirm.SelectOk();
1561+
}
1562+
15561563
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
1557-
return (Confirm.SelectOk)
1564+
Confirm.SelectOk result = (Confirm.SelectOk)
15581565
exnWrappingRpc(new Confirm.Select(false)).getMethod();
15591566

1567+
confirmSelectActivated = true;
1568+
return result;
15601569
}
15611570

15621571
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
package com.rabbitmq.client.test;
1717

18+
import com.rabbitmq.client.Command;
1819
import com.rabbitmq.client.Method;
20+
import com.rabbitmq.client.TrafficListener;
1921
import com.rabbitmq.client.impl.*;
2022
import org.junit.jupiter.api.AfterEach;
2123
import org.junit.jupiter.api.BeforeEach;
@@ -26,6 +28,7 @@
2628
import java.util.concurrent.Executors;
2729
import java.util.stream.Stream;
2830
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2932

3033
public class ChannelNTest {
3134

@@ -81,6 +84,29 @@ public TestConfig(int value, Consumer call) {
8184
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
8285
}
8386

87+
@Test
88+
public void confirmSelectOnlySendsRPCCallOnce() throws Exception {
89+
AMQConnection connection = Mockito.mock(AMQConnection.class);
90+
TrafficListener trafficListener = Mockito.mock(TrafficListener.class);
91+
92+
Mockito.when(connection.getTrafficListener()).thenReturn(trafficListener);
93+
94+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
95+
96+
new Thread(() -> {
97+
try {
98+
Thread.sleep(15);
99+
channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk()));
100+
} catch (Exception e) {
101+
throw new RuntimeException(e);
102+
}
103+
}).start();
104+
105+
assertNotNull(channel.confirmSelect());
106+
assertNotNull(channel.confirmSelect());
107+
Mockito.verify(trafficListener, Mockito.times(1)).write(Mockito.any(Command.class));
108+
}
109+
84110
interface Consumer {
85111

86112
void apply(int value) throws Exception;

0 commit comments

Comments
 (0)