Skip to content

Commit 0f14836

Browse files
committed
Check message too large goes back to queue
In case manual acknowledgement is used. (cherry picked from commit adb9091)
1 parent 3cc1c62 commit 0f14836

File tree

2 files changed

+81
-5
lines changed

2 files changed

+81
-5
lines changed

src/test/java/com/rabbitmq/client/test/BrokerTestCase.java

-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ protected boolean isAutomaticRecoveryEnabled() {
6363

6464
@BeforeEach
6565
public void setUp(TestInfo testInfo) throws IOException, TimeoutException {
66-
67-
6866
Assumptions.assumeTrue(shouldRun());
6967
this.testInfo = testInfo;
7068
openConnection();

src/test/java/com/rabbitmq/client/test/MaxInboundMessageSizeTest.java

+81-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom
2+
// Inc.
3+
// and/or its subsidiaries.
24
//
35
// This software, the RabbitMQ Java client library, is triple-licensed under the
46
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -20,11 +22,13 @@
2022

2123
import com.rabbitmq.client.*;
2224
import java.io.IOException;
25+
import java.util.UUID;
2326
import java.util.concurrent.CountDownLatch;
2427
import java.util.concurrent.TimeoutException;
2528
import java.util.concurrent.atomic.AtomicReference;
2629
import org.junit.jupiter.params.ParameterizedTest;
2730
import org.junit.jupiter.params.provider.CsvSource;
31+
import org.junit.jupiter.params.provider.ValueSource;
2832

2933
public class MaxInboundMessageSizeTest extends BrokerTestCase {
3034

@@ -64,6 +68,60 @@ void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boole
6468
byte[] body = new byte[maxMessageSize * 2];
6569
ch.basicPublish("", q, null, body);
6670
ch.waitForConfirmsOrDie();
71+
AtomicReference<Throwable> channelException = new AtomicReference<>();
72+
CountDownLatch channelErrorLatch = new CountDownLatch(1);
73+
ch.addShutdownListener(
74+
cause -> {
75+
channelException.set(cause.getCause());
76+
channelErrorLatch.countDown();
77+
});
78+
AtomicReference<Throwable> connectionException = new AtomicReference<>();
79+
CountDownLatch connectionErrorLatch = new CountDownLatch(1);
80+
c.addShutdownListener(
81+
cause -> {
82+
connectionException.set(cause.getCause());
83+
connectionErrorLatch.countDown();
84+
});
85+
if (basicGet) {
86+
try {
87+
ch.basicGet(q, true);
88+
} catch (Exception e) {
89+
// OK for basicGet
90+
}
91+
} else {
92+
ch.basicConsume(q, new DefaultConsumer(ch));
93+
}
94+
assertThat(channelErrorLatch).is(completed());
95+
assertThat(channelException.get())
96+
.isInstanceOf(IllegalStateException.class)
97+
.hasMessageContaining("Message body is too large");
98+
assertThat(connectionErrorLatch).is(completed());
99+
assertThat(connectionException.get())
100+
.isInstanceOf(IllegalStateException.class)
101+
.hasMessageContaining("Message body is too large");
102+
} finally {
103+
safeClose(c);
104+
}
105+
}
106+
107+
@ParameterizedTest
108+
@ValueSource(booleans = {true, false})
109+
void largeMessageShouldGoBackToQueue(boolean basicGet) throws Exception {
110+
int maxMessageSize = 5_000;
111+
int maxFrameSize = maxMessageSize * 4;
112+
ConnectionFactory cf = newConnectionFactory();
113+
cf.setMaxInboundMessageBodySize(maxMessageSize);
114+
cf.setRequestedFrameMax(maxFrameSize);
115+
String messageId = UUID.randomUUID().toString();
116+
Connection c = cf.newConnection();
117+
try {
118+
Channel ch = c.createChannel();
119+
ch.confirmSelect();
120+
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
121+
propsBuilder.messageId(messageId);
122+
byte[] body = new byte[maxMessageSize * 2];
123+
ch.basicPublish("", q, propsBuilder.build(), body);
124+
ch.waitForConfirmsOrDie();
67125
AtomicReference<Throwable> exception = new AtomicReference<>();
68126
CountDownLatch errorLatch = new CountDownLatch(1);
69127
ch.addShutdownListener(
@@ -73,12 +131,12 @@ void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boole
73131
});
74132
if (basicGet) {
75133
try {
76-
ch.basicGet(q, true);
134+
ch.basicGet(q, false);
77135
} catch (Exception e) {
78136
// OK for basicGet
79137
}
80138
} else {
81-
ch.basicConsume(q, new DefaultConsumer(ch));
139+
ch.basicConsume(q, false, new DefaultConsumer(ch));
82140
}
83141
assertThat(errorLatch).is(completed());
84142
assertThat(exception.get())
@@ -87,6 +145,26 @@ void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boole
87145
} finally {
88146
safeClose(c);
89147
}
148+
149+
cf = newConnectionFactory();
150+
cf.setMaxInboundMessageBodySize(maxMessageSize * 3);
151+
cf.setRequestedFrameMax(maxFrameSize * 3);
152+
try (Connection conn = cf.newConnection()) {
153+
AtomicReference<String> receivedMessageId = new AtomicReference<>();
154+
Channel ch = conn.createChannel();
155+
CountDownLatch consumeLatch = new CountDownLatch(1);
156+
ch.basicConsume(
157+
q,
158+
true,
159+
(consumerTag, message) -> {
160+
receivedMessageId.set(message.getProperties().getMessageId());
161+
consumeLatch.countDown();
162+
},
163+
consumerTag -> {});
164+
165+
assertThat(consumeLatch).is(completed());
166+
assertThat(receivedMessageId).hasValue(messageId);
167+
}
90168
}
91169

92170
@Override

0 commit comments

Comments
 (0)