Skip to content

Commit d85c2d4

Browse files
committed
Send empty message body instead of null message body
Null message body is forbidden in the AMQP 1.0 specification. We set a data section with an empty binary when the message body has not been set. Stream messages are not checked on the server side, but messages from other protocols will be checked for a non-null body in RabbitMQ 4.0, and letting null body stream messages through will crash when those messages are read with other protocols. References rabbitmq/rabbitmq-server#6835 Fixes #544
1 parent 77dec7a commit d85c2d4

File tree

7 files changed

+25
-10
lines changed

7 files changed

+25
-10
lines changed

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,14 @@
2525
import java.util.UUID;
2626
import java.util.function.Function;
2727
import org.apache.qpid.proton.amqp.*;
28-
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
29-
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
30-
import org.apache.qpid.proton.amqp.messaging.Data;
31-
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
28+
import org.apache.qpid.proton.amqp.messaging.*;
3229
import org.apache.qpid.proton.codec.ReadableBuffer;
3330
import org.apache.qpid.proton.codec.WritableBuffer;
3431

3532
public class QpidProtonCodec implements Codec {
3633

34+
static final Section EMPTY_BODY = new Data(new Binary(new byte[0]));
35+
3736
private static final Function<String, String> MESSAGE_ANNOTATIONS_STRING_KEY_EXTRACTOR = k -> k;
3837
private static final Function<Symbol, String> MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR =
3938
Symbol::toString;
@@ -233,13 +232,16 @@ public EncodedMessage encode(Message message) {
233232
qpidMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
234233
}
235234

236-
if (message.getBodyAsBinary() != null) {
235+
if (message.getBodyAsBinary() == null) {
236+
qpidMessage.setBody(EMPTY_BODY);
237+
} else {
237238
qpidMessage.setBody(new Data(new Binary(message.getBodyAsBinary())));
238239
}
239240
}
240241
int bufferSize;
241242
if (qpidMessage.getBody() instanceof Data) {
242243
bufferSize = (int) (((Data) qpidMessage.getBody()).getValue().getLength() * 1.5);
244+
bufferSize = bufferSize == 0 ? 128 : bufferSize;
243245
} else {
244246
bufferSize = 8192;
245247
}

src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java

+5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
1515
package com.rabbitmq.stream.codec;
1616

17+
import static com.rabbitmq.stream.codec.QpidProtonCodec.EMPTY_BODY;
18+
1719
import com.rabbitmq.stream.Message;
1820
import com.rabbitmq.stream.MessageBuilder;
1921
import java.math.BigDecimal;
@@ -57,6 +59,9 @@ public Message build() {
5759
message.setMessageAnnotations(
5860
new MessageAnnotations(messageAnnotationsBuilder.messageAnnotations));
5961
}
62+
if (message.getBody() == null) {
63+
message.setBody(EMPTY_BODY);
64+
}
6065
return new QpidProtonCodec.QpidProtonAmqpMessageWrapper(
6166
hasPublishingId, publishingId, message);
6267
} else {

src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323

2424
public class SimpleCodec implements Codec {
2525

26+
static final byte[] EMPTY_BODY = new byte[0];
27+
2628
@Override
2729
public EncodedMessage encode(Message message) {
28-
return new EncodedMessage(message.getBodyAsBinary().length, message.getBodyAsBinary());
30+
byte[] body = message.getBodyAsBinary() == null ? EMPTY_BODY : message.getBodyAsBinary();
31+
return new EncodedMessage(body.length, body);
2932
}
3033

3134
@Override

src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
public class SwiftMqCodec implements Codec {
3434

35+
static final Data EMPTY_BODY = new Data(new byte[0]);
36+
3537
private static Object convertAmqpMapValue(AMQPType value) {
3638
if (value instanceof AMQPBoolean) {
3739
return ((AMQPBoolean) value).getValue() ? Boolean.TRUE : Boolean.FALSE;
@@ -260,7 +262,9 @@ public EncodedMessage encode(Message message) {
260262
}
261263
}
262264

263-
if (message.getBodyAsBinary() != null) {
265+
if (message.getBodyAsBinary() == null) {
266+
outboundMessage.addData(EMPTY_BODY);
267+
} else {
264268
outboundMessage.addData(new Data(message.getBodyAsBinary()));
265269
}
266270
}

src/main/java/com/rabbitmq/stream/codec/SwiftMqMessageBuilder.java

+3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public Message build() {
6666
throw new StreamException("Error while setting application properties", e);
6767
}
6868
}
69+
if (outboundMessage.getData() == null) {
70+
outboundMessage.addData(SwiftMqCodec.EMPTY_BODY);
71+
}
6972
return new SwiftMqCodec.SwiftMqAmqpMessageWrapper(
7073
hasPublishingId, publishingId, outboundMessage);
7174
} else {

src/main/java/com/rabbitmq/stream/codec/WrapperMessageBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Message build() {
4242
return new SimpleMessage(
4343
this.hasPublishingId,
4444
this.publishingId,
45-
body,
45+
body == null ? SimpleCodec.EMPTY_BODY : body,
4646
this.messageAnnotationsBuilder == null
4747
? null
4848
: this.messageAnnotationsBuilder.messageAnnotations,

src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import com.rabbitmq.stream.codec.SwiftMqCodec;
3232
import com.rabbitmq.stream.impl.Client.ClientParameters;
3333
import com.rabbitmq.stream.impl.Client.Response;
34-
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
35-
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
3634
import java.nio.charset.Charset;
3735
import java.nio.charset.StandardCharsets;
3836
import java.util.*;

0 commit comments

Comments
 (0)