Skip to content

Commit f6f89f6

Browse files
committed
Make sure qos, heartbeat, max channel are unsigned shorts
Sets the value to 0 or 65535 and issues a warning if it is out of range. Fixes #642 (cherry picked from commit f2ea862) Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java
1 parent 67d4013 commit f6f89f6

File tree

5 files changed

+84
-38
lines changed

5 files changed

+84
-38
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import com.rabbitmq.client.impl.recovery.RetryHandler;
3131
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
3232
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import javax.net.SocketFactory;
3537
import javax.net.ssl.HostnameVerifier;
@@ -61,6 +63,8 @@
6163
*/
6264
public class ConnectionFactory implements Cloneable {
6365

66+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactory.class);
67+
6468
private static final int MAX_UNSIGNED_SHORT = 65535;
6569

6670
/** Default user name */
@@ -411,10 +415,11 @@ public int getRequestedChannelMax() {
411415
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
412416
*/
413417
public void setRequestedChannelMax(int requestedChannelMax) {
414-
if (requestedChannelMax < 0 || requestedChannelMax > MAX_UNSIGNED_SHORT) {
415-
throw new IllegalArgumentException("Requested channel max must be between 0 and " + MAX_UNSIGNED_SHORT);
418+
this.requestedChannelMax = ensureUnsignedShort(requestedChannelMax);
419+
if (this.requestedChannelMax != requestedChannelMax) {
420+
LOGGER.warn("Requested channel max must be between 0 and {}, value has been set to {} instead of {}",
421+
MAX_UNSIGNED_SHORT, this.requestedChannelMax, requestedChannelMax);
416422
}
417-
this.requestedChannelMax = requestedChannelMax;
418423
}
419424

420425
/**
@@ -510,10 +515,11 @@ public int getShutdownTimeout() {
510515
* @see <a href="https://rabbitmq.com/heartbeats.html">RabbitMQ Heartbeats Guide</a>
511516
*/
512517
public void setRequestedHeartbeat(int requestedHeartbeat) {
513-
if (requestedHeartbeat < 0 || requestedHeartbeat > MAX_UNSIGNED_SHORT) {
514-
throw new IllegalArgumentException("Requested heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT);
518+
this.requestedHeartbeat = ensureUnsignedShort(requestedHeartbeat);
519+
if (this.requestedHeartbeat != requestedHeartbeat) {
520+
LOGGER.warn("Requested heartbeat must be between 0 and {}, value has been set to {} instead of {}",
521+
MAX_UNSIGNED_SHORT, this.requestedHeartbeat, requestedHeartbeat);
515522
}
516-
this.requestedHeartbeat = requestedHeartbeat;
517523
}
518524

519525
/**
@@ -1608,4 +1614,14 @@ public void setConnectionPostProcessor(ConnectionPostProcessor connectionPostPro
16081614
public void setTrafficListener(TrafficListener trafficListener) {
16091615
this.trafficListener = trafficListener;
16101616
}
1617+
1618+
public static int ensureUnsignedShort(int value) {
1619+
if (value < 0) {
1620+
return 0;
1621+
} else if (value > MAX_UNSIGNED_SHORT) {
1622+
return MAX_UNSIGNED_SHORT;
1623+
} else {
1624+
return value;
1625+
}
1626+
}
16111627
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -386,12 +386,15 @@ public void start()
386386
}
387387

388388
try {
389-
int channelMax =
389+
int negotiatedChannelMax =
390390
negotiateChannelMax(this.requestedChannelMax,
391391
connTune.getChannelMax());
392392

393-
if (!checkUnsignedShort(channelMax)) {
394-
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
393+
int channelMax = ConnectionFactory.ensureUnsignedShort(negotiatedChannelMax);
394+
395+
if (channelMax != negotiatedChannelMax) {
396+
LOGGER.warn("Channel max must be between 0 and {}, value has been set to {} instead of {}",
397+
MAX_UNSIGNED_SHORT, channelMax, negotiatedChannelMax);
395398
}
396399

397400
_channelManager = instantiateChannelManager(channelMax, threadFactory);
@@ -401,12 +404,15 @@ public void start()
401404
connTune.getFrameMax());
402405
this._frameMax = frameMax;
403406

404-
int heartbeat =
407+
int negotiatedHeartbeat =
405408
negotiatedMaxValue(this.requestedHeartbeat,
406409
connTune.getHeartbeat());
407410

408-
if (!checkUnsignedShort(heartbeat)) {
409-
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
411+
int heartbeat = ConnectionFactory.ensureUnsignedShort(negotiatedHeartbeat);
412+
413+
if (heartbeat != negotiatedHeartbeat) {
414+
LOGGER.warn("Heartbeat must be between 0 and {}, value has been set to {} instead of {}",
415+
MAX_UNSIGNED_SHORT, heartbeat, negotiatedHeartbeat);
410416
}
411417

412418
setHeartbeat(heartbeat);

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,10 +644,12 @@ public AMQCommand transformReply(AMQCommand command) {
644644
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
645645
throws IOException
646646
{
647-
if (prefetchCount < 0 || prefetchCount > MAX_UNSIGNED_SHORT) {
648-
throw new IllegalArgumentException("Prefetch count must be between 0 and " + MAX_UNSIGNED_SHORT);
647+
int unsignedShortPrefetchCount = ConnectionFactory.ensureUnsignedShort(prefetchCount);
648+
if (unsignedShortPrefetchCount != prefetchCount) {
649+
LOGGER.warn("Prefetch count must be between 0 and {}, value has been set to {} instead of {}",
650+
MAX_UNSIGNED_SHORT, unsignedShortPrefetchCount, prefetchCount);
649651
}
650-
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
652+
exnWrappingRpc(new Basic.Qos(prefetchSize, unsignedShortPrefetchCount, global));
651653
}
652654

653655
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@
2525
import java.io.IOException;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.stream.Stream;
2930

30-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
import static org.assertj.core.api.Assertions.assertThat;
3132

3233
public class ChannelNTest {
3334

@@ -64,23 +65,43 @@ public void callingBasicCancelForUnknownConsumerThrowsException() throws Excepti
6465
@Test
6566
public void qosShouldBeUnsignedShort() {
6667
AMQConnection connection = Mockito.mock(AMQConnection.class);
67-
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
68+
AtomicReference<com.rabbitmq.client.AMQP.Basic.Qos> qosMethod = new AtomicReference<>();
69+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService) {
70+
@Override
71+
public AMQCommand exnWrappingRpc(Method m) {
72+
qosMethod.set((com.rabbitmq.client.AMQP.Basic.Qos) m);
73+
return null;
74+
}
75+
};
6876
class TestConfig {
6977
int value;
7078
Consumer call;
79+
int expected;
7180

72-
public TestConfig(int value, Consumer call) {
81+
public TestConfig(int value, Consumer call, int expected) {
7382
this.value = value;
7483
this.call = call;
84+
this.expected = expected;
7585
}
7686
}
7787
Consumer qos = value -> channel.basicQos(value);
7888
Consumer qosGlobal = value -> channel.basicQos(value, true);
7989
Consumer qosPrefetchSize = value -> channel.basicQos(10, value, true);
8090
Stream.of(
81-
new TestConfig(-1, qos), new TestConfig(65536, qos)
82-
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal), new TestConfig(config.value, qosPrefetchSize)))
83-
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
91+
new TestConfig(-1, qos, 0), new TestConfig(65536, qos, 65535),
92+
new TestConfig(10, qos, 10), new TestConfig(0, qos, 0)
93+
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal, config.expected), new TestConfig(config.value, qosPrefetchSize, config.expected)))
94+
.forEach(config -> {
95+
try {
96+
assertThat(qosMethod.get()).isNull();
97+
config.call.apply(config.value);
98+
assertThat(qosMethod.get()).isNotNull();
99+
assertThat(qosMethod.get().getPrefetchCount()).isEqualTo(config.expected);
100+
qosMethod.set(null);
101+
} catch (Exception e) {
102+
e.printStackTrace();
103+
}
104+
});
84105
}
85106

86107
interface Consumer {

src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.concurrent.atomic.AtomicReference;
3434
import java.util.function.Consumer;
35+
import java.util.function.Supplier;
3536
import java.util.stream.Stream;
3637

3738
import static org.assertj.core.api.Assertions.assertThat;
38-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3939
import static org.mockito.Mockito.*;
4040

4141
public class ConnectionFactoryTest {
@@ -168,33 +168,34 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {
168168
public void heartbeatAndChannelMaxMustBeUnsignedShorts() {
169169
class TestConfig {
170170
int value;
171-
Consumer<Integer> call;
172-
boolean expectException;
171+
Supplier<Integer> getCall;
172+
Consumer<Integer> setCall;
173+
int expected;
173174

174-
public TestConfig(int value, Consumer<Integer> call, boolean expectException) {
175+
public TestConfig(int value, Supplier<Integer> getCall, Consumer<Integer> setCall, int expected) {
175176
this.value = value;
176-
this.call = call;
177-
this.expectException = expectException;
177+
this.getCall = getCall;
178+
this.setCall = setCall;
179+
this.expected = expected;
178180
}
179181
}
180182

181183
ConnectionFactory cf = new ConnectionFactory();
184+
Supplier<Integer> getHeartbeart = () -> cf.getRequestedHeartbeat();
182185
Consumer<Integer> setHeartbeat = cf::setRequestedHeartbeat;
186+
Supplier<Integer> getChannelMax = () -> cf.getRequestedChannelMax();
183187
Consumer<Integer> setChannelMax = cf::setRequestedChannelMax;
184188

185189
Stream.of(
186-
new TestConfig(0, setHeartbeat, false),
187-
new TestConfig(10, setHeartbeat, false),
188-
new TestConfig(65535, setHeartbeat, false),
189-
new TestConfig(-1, setHeartbeat, true),
190-
new TestConfig(65536, setHeartbeat, true))
191-
.flatMap(config -> Stream.of(config, new TestConfig(config.value, setChannelMax, config.expectException)))
190+
new TestConfig(0, getHeartbeart, setHeartbeat, 0),
191+
new TestConfig(10, getHeartbeart, setHeartbeat, 10),
192+
new TestConfig(65535, getHeartbeart, setHeartbeat, 65535),
193+
new TestConfig(-1, getHeartbeart, setHeartbeat, 0),
194+
new TestConfig(65536, getHeartbeart, setHeartbeat, 65535))
195+
.flatMap(config -> Stream.of(config, new TestConfig(config.value, getChannelMax, setChannelMax, config.expected)))
192196
.forEach(config -> {
193-
if (config.expectException) {
194-
assertThatThrownBy(() -> config.call.accept(config.value)).isInstanceOf(IllegalArgumentException.class);
195-
} else {
196-
config.call.accept(config.value);
197-
}
197+
config.setCall.accept(config.value);
198+
assertThat(config.getCall.get()).isEqualTo(config.expected);
198199
});
199200

200201
}

0 commit comments

Comments
 (0)