Skip to content

Commit 7253c94

Browse files
committed
Do not confirmSelect more than once per channel
In order to avoid unnecessary blocking RPC calls and conform to best practices, the Channel now checks if it is already activated confirm mode before sending a confirm.select RPC call. If confirm mode is already activated, calling confirmSelect() again returns immediately without sending an RPC call. Closes #1056
1 parent 9aec291 commit 7253c94

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8282
private final SortedSet<Long> unconfirmedSet =
8383
Collections.synchronizedSortedSet(new TreeSet<Long>());
8484

85+
/** Whether the confirm select method has been successfully activated */
86+
private boolean confirmSelectActivated = false;
87+
8588
/** Whether any nacks have been received since the last waitForConfirms(). */
8689
private volatile boolean onlyAcksReceived = true;
8790

@@ -1553,10 +1556,16 @@ public Tx.RollbackOk txRollback()
15531556
public Confirm.SelectOk confirmSelect()
15541557
throws IOException
15551558
{
1559+
if (confirmSelectActivated) {
1560+
return new Confirm.SelectOk();
1561+
}
1562+
15561563
if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
1557-
return (Confirm.SelectOk)
1564+
Confirm.SelectOk result = (Confirm.SelectOk)
15581565
exnWrappingRpc(new Confirm.Select(false)).getMethod();
15591566

1567+
confirmSelectActivated = true;
1568+
return result;
15601569
}
15611570

15621571
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,20 @@
1515

1616
package com.rabbitmq.client.test;
1717

18+
import com.rabbitmq.client.Command;
1819
import com.rabbitmq.client.Method;
20+
import com.rabbitmq.client.TrafficListener;
1921
import com.rabbitmq.client.impl.*;
2022
import org.junit.jupiter.api.AfterEach;
2123
import org.junit.jupiter.api.BeforeEach;
2224
import org.junit.jupiter.api.Test;
2325
import org.mockito.Mockito;
2426

25-
import java.util.concurrent.ExecutorService;
26-
import java.util.concurrent.Executors;
27+
import java.io.IOException;
28+
import java.util.concurrent.*;
2729
import java.util.stream.Stream;
2830
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2932

3033
public class ChannelNTest {
3134

@@ -81,6 +84,30 @@ public TestConfig(int value, Consumer call) {
8184
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
8285
}
8386

87+
@Test
88+
public void confirmSelectOnlySendsRPCCallOnce() throws Exception {
89+
AMQConnection connection = Mockito.mock(AMQConnection.class);
90+
TrafficListener trafficListener = Mockito.mock(TrafficListener.class);
91+
92+
Mockito.when(connection.getTrafficListener()).thenReturn(trafficListener);
93+
94+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
95+
96+
Future<AMQImpl.Confirm.SelectOk> future = executorService.submit(() -> {
97+
try {
98+
return channel.confirmSelect();
99+
} catch (IOException e) {
100+
throw new RuntimeException(e);
101+
}
102+
});
103+
104+
channel.handleCompleteInboundCommand(new AMQCommand(new AMQImpl.Confirm.SelectOk()));
105+
106+
assertNotNull(future.get(1, TimeUnit.SECONDS));
107+
assertNotNull(channel.confirmSelect());
108+
Mockito.verify(trafficListener, Mockito.times(1)).write(Mockito.any(Command.class));
109+
}
110+
84111
interface Consumer {
85112

86113
void apply(int value) throws Exception;

0 commit comments

Comments
 (0)