Skip to content

Commit 943b4c4

Browse files
authored
Merge pull request #729 from rabbitmq/list-map-array-in-message-annotations
Support map, list, and array types when decoding message annotations
2 parents e3d3ce7 + 85d93ab commit 943b4c4

File tree

7 files changed

+373
-43
lines changed

7 files changed

+373
-43
lines changed

src/main/java/com/rabbitmq/stream/MessageBuilder.java

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package com.rabbitmq.stream;
1616

1717
import java.math.BigDecimal;
18+
import java.util.List;
19+
import java.util.Map;
1820
import java.util.UUID;
1921

2022
/**
@@ -186,6 +188,12 @@ interface MessageAnnotationsBuilder {
186188

187189
MessageAnnotationsBuilder entrySymbol(String key, String value);
188190

191+
MessageAnnotationsBuilder entry(String key, List<?> list);
192+
193+
MessageAnnotationsBuilder entry(String key, Map<?, ?> map);
194+
195+
MessageAnnotationsBuilder entryArray(String key, Object[] array);
196+
189197
/**
190198
* Go back to the message builder
191199
*

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

+23-20
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
import com.rabbitmq.stream.MessageBuilder;
2020
import com.rabbitmq.stream.Properties;
2121
import java.nio.ByteBuffer;
22-
import java.util.Date;
23-
import java.util.LinkedHashMap;
24-
import java.util.Map;
25-
import java.util.UUID;
22+
import java.util.*;
2623
import java.util.function.Function;
2724
import org.apache.qpid.proton.amqp.*;
2825
import org.apache.qpid.proton.amqp.messaging.*;
@@ -63,15 +60,15 @@ private static <K> Map<String, Object> createMapFromAmqpMap(
6360
if (amqpMap != null) {
6461
result = new LinkedHashMap<>(amqpMap.size());
6562
for (Map.Entry<K, Object> entry : amqpMap.entrySet()) {
66-
result.put(keyMaker.apply(entry.getKey()), convertApplicationProperty(entry.getValue()));
63+
result.put(keyMaker.apply(entry.getKey()), fromQpidToJava(entry.getValue()));
6764
}
6865
} else {
6966
result = null;
7067
}
7168
return result;
7269
}
7370

74-
private static Object convertApplicationProperty(Object value) {
71+
private static Object fromQpidToJava(Object value) {
7572
if (value instanceof Boolean
7673
|| value instanceof Byte
7774
|| value instanceof Short
@@ -81,7 +78,10 @@ private static Object convertApplicationProperty(Object value) {
8178
|| value instanceof Double
8279
|| value instanceof String
8380
|| value instanceof Character
84-
|| value instanceof UUID) {
81+
|| value instanceof UUID
82+
|| value instanceof List
83+
|| value instanceof Map
84+
|| value instanceof Object[]) {
8585
return value;
8686
} else if (value instanceof Binary) {
8787
return ((Binary) value).getArray();
@@ -99,9 +99,10 @@ private static Object convertApplicationProperty(Object value) {
9999
return ((Symbol) value).toString();
100100
} else if (value == null) {
101101
return null;
102+
} else if (value.getClass().isArray()) {
103+
return value;
102104
} else {
103-
throw new IllegalArgumentException(
104-
"Type not supported for an application property: " + value.getClass());
105+
throw new IllegalArgumentException("Type not supported: " + value.getClass());
105106
}
106107
}
107108

@@ -281,7 +282,10 @@ protected Object convertToQpidType(Object value) {
281282
|| value instanceof String
282283
|| value instanceof Character
283284
|| value instanceof UUID
284-
|| value instanceof Date) {
285+
|| value instanceof Date
286+
|| value instanceof List
287+
|| value instanceof Map
288+
|| value instanceof Object[]) {
285289
return value;
286290
} else if (value instanceof com.rabbitmq.stream.amqp.UnsignedByte) {
287291
return UnsignedByte.valueOf(((com.rabbitmq.stream.amqp.UnsignedByte) value).byteValue());
@@ -298,8 +302,7 @@ protected Object convertToQpidType(Object value) {
298302
} else if (value == null) {
299303
return null;
300304
} else {
301-
throw new IllegalArgumentException(
302-
"Type not supported for an application property: " + value.getClass());
305+
throw new IllegalArgumentException("Type not supported: " + value.getClass());
303306
}
304307
}
305308

@@ -634,28 +637,28 @@ public Message copy() {
634637

635638
// from
636639
// https://github.com/apache/activemq/blob/master/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
637-
private static class ByteArrayWritableBuffer implements WritableBuffer {
640+
static class ByteArrayWritableBuffer implements WritableBuffer {
638641

639-
public static final int DEFAULT_CAPACITY = 4 * 1024;
642+
static final int DEFAULT_CAPACITY = 4 * 1024;
640643

641-
byte[] buffer;
642-
int position;
644+
private byte[] buffer;
645+
private int position;
643646

644647
/** Creates a new WritableBuffer with default capacity. */
645-
public ByteArrayWritableBuffer() {
648+
ByteArrayWritableBuffer() {
646649
this(DEFAULT_CAPACITY);
647650
}
648651

649652
/** Create a new WritableBuffer with the given capacity. */
650-
public ByteArrayWritableBuffer(int capacity) {
653+
ByteArrayWritableBuffer(int capacity) {
651654
this.buffer = new byte[capacity];
652655
}
653656

654-
public byte[] getArray() {
657+
byte[] getArray() {
655658
return buffer;
656659
}
657660

658-
public int getArrayLength() {
661+
int getArrayLength() {
659662
return position;
660663
}
661664

src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
import com.rabbitmq.stream.MessageBuilder;
2121
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2222
import java.math.BigDecimal;
23-
import java.util.Date;
24-
import java.util.LinkedHashMap;
25-
import java.util.Map;
26-
import java.util.UUID;
23+
import java.util.*;
2724
import java.util.concurrent.atomic.AtomicBoolean;
2825
import org.apache.qpid.proton.amqp.Binary;
2926
import org.apache.qpid.proton.amqp.Symbol;
@@ -365,6 +362,24 @@ public MessageAnnotationsBuilder entrySymbol(String key, String value) {
365362
return this;
366363
}
367364

365+
@Override
366+
public MessageAnnotationsBuilder entry(String key, List<?> list) {
367+
messageAnnotations.put(Symbol.getSymbol(key), list);
368+
return this;
369+
}
370+
371+
@Override
372+
public MessageAnnotationsBuilder entry(String key, Map<?, ?> map) {
373+
messageAnnotations.put(Symbol.getSymbol(key), map);
374+
return this;
375+
}
376+
377+
@Override
378+
public MessageAnnotationsBuilder entryArray(String key, Object[] array) {
379+
messageAnnotations.put(Symbol.getSymbol(key), array);
380+
return this;
381+
}
382+
368383
@Override
369384
public MessageBuilder messageBuilder() {
370385
return messageBuilder;

src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java

+142-8
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@
2525
import com.swiftmq.amqp.v100.types.*;
2626
import com.swiftmq.tools.util.DataByteArrayOutputStream;
2727
import java.io.IOException;
28-
import java.util.Date;
29-
import java.util.LinkedHashMap;
30-
import java.util.Map;
31-
import java.util.UUID;
28+
import java.lang.reflect.Array;
29+
import java.nio.charset.StandardCharsets;
30+
import java.util.*;
3231

3332
public class SwiftMqCodec implements Codec {
3433

@@ -69,13 +68,48 @@ private static Object convertAmqpMapValue(AMQPType value) {
6968
return ((AMQPUuid) value).getValue();
7069
} else if (value instanceof AMQPSymbol) {
7170
return ((AMQPSymbol) value).getValue();
71+
} else if (value instanceof AMQPList) {
72+
try {
73+
List<AMQPType> source = ((AMQPList) value).getValue();
74+
List<Object> target = new ArrayList<>(source.size());
75+
for (AMQPType o : source) {
76+
target.add(convertAmqpMapValue(o));
77+
}
78+
return target;
79+
} catch (IOException e) {
80+
throw new StreamException("Error while reading SwiftMQ list", e);
81+
}
82+
} else if (value instanceof AMQPMap) {
83+
try {
84+
Map<AMQPType, AMQPType> source = ((AMQPMap) value).getValue();
85+
Map<Object, Object> target = new LinkedHashMap<>(source.size());
86+
for (Map.Entry<AMQPType, AMQPType> entry : source.entrySet()) {
87+
target.put(convertAmqpMapValue(entry.getKey()), convertAmqpMapValue(entry.getValue()));
88+
}
89+
return target;
90+
} catch (IOException e) {
91+
throw new StreamException("Error while reading SwiftMQ map", e);
92+
}
93+
} else if (value instanceof AMQPArray) {
94+
try {
95+
AMQPType[] source = ((AMQPArray) value).getValue();
96+
Object target =
97+
Array.newInstance(
98+
source.length == 0 ? Object.class : convertAmqpMapValue(source[0]).getClass(),
99+
source.length);
100+
for (int i = 0; i < source.length; i++) {
101+
Array.set(target, i, convertAmqpMapValue(source[i]));
102+
}
103+
return target;
104+
} catch (IOException e) {
105+
throw new StreamException("Error while reading SwiftMQ array", e);
106+
}
72107
} else if (value instanceof AMQPNull) {
73108
return null;
74109
} else if (value == null) {
75110
return null;
76111
} else {
77-
throw new IllegalArgumentException(
78-
"Type not supported for an application property: " + value.getClass());
112+
throw new IllegalArgumentException("Type not supported: " + value.getClass());
79113
}
80114
}
81115

@@ -320,11 +354,111 @@ protected static AMQPType convertToSwiftMqType(Object value) {
320354
return new AMQPSymbol(value.toString());
321355
} else if (value instanceof UUID) {
322356
return new AMQPUuid((UUID) value);
357+
} else if (value instanceof List) {
358+
List<?> source = (List<?>) value;
359+
List<AMQPType> target = new ArrayList<>(source.size());
360+
for (Object o : source) {
361+
target.add(convertToSwiftMqType(o));
362+
}
363+
try {
364+
return new AMQPList(target);
365+
} catch (IOException e) {
366+
throw new StreamException("Error while creating SwiftMQ list", e);
367+
}
368+
} else if (value instanceof Map) {
369+
Map<?, ?> source = (Map<?, ?>) value;
370+
Map<AMQPType, AMQPType> target = new LinkedHashMap<>(source.size());
371+
for (Map.Entry<?, ?> entry : source.entrySet()) {
372+
target.put(convertToSwiftMqType(entry.getKey()), convertToSwiftMqType(entry.getValue()));
373+
}
374+
try {
375+
return new AMQPMap(target);
376+
} catch (IOException e) {
377+
throw new StreamException("Error while creating SwiftMQ map", e);
378+
}
379+
} else if (value instanceof Object[]) {
380+
Object[] source = (Object[]) value;
381+
AMQPType[] target = new AMQPType[source.length];
382+
for (int i = 0; i < source.length; i++) {
383+
target[i] = convertToSwiftMqType(source[i]);
384+
}
385+
try {
386+
int code = source.length == 0 ? AMQPTypeDecoder.UNKNOWN : toSwiftMqTypeCode(source[0]);
387+
return new AMQPArray(code, target);
388+
} catch (IOException e) {
389+
throw new StreamException("Error while creating SwiftMQ list", e);
390+
}
323391
} else if (value == null) {
324392
return AMQPNull.NULL;
325393
} else {
326-
throw new IllegalArgumentException(
327-
"Type not supported for an application property: " + value.getClass());
394+
throw new IllegalArgumentException("Type not supported: " + value.getClass());
395+
}
396+
}
397+
398+
protected static int toSwiftMqTypeCode(Object value) {
399+
if (value instanceof Boolean) {
400+
return AMQPTypeDecoder.BOOLEAN;
401+
} else if (value instanceof Byte) {
402+
return AMQPTypeDecoder.BYTE;
403+
} else if (value instanceof Short) {
404+
return AMQPTypeDecoder.SHORT;
405+
} else if (value instanceof Integer) {
406+
int v = (Integer) value;
407+
return (v < -128 || v > 127) ? AMQPTypeDecoder.INT : AMQPTypeDecoder.SINT;
408+
} else if (value instanceof Long) {
409+
long v = (Long) value;
410+
return (v < -128 || v > 127) ? AMQPTypeDecoder.LONG : AMQPTypeDecoder.SLONG;
411+
} else if (value instanceof UnsignedByte) {
412+
return AMQPTypeDecoder.UBYTE;
413+
} else if (value instanceof UnsignedShort) {
414+
return AMQPTypeDecoder.USHORT;
415+
} else if (value instanceof UnsignedInteger) {
416+
return AMQPTypeDecoder.UINT;
417+
} else if (value instanceof UnsignedLong) {
418+
return AMQPTypeDecoder.ULONG;
419+
} else if (value instanceof Float) {
420+
return AMQPTypeDecoder.FLOAT;
421+
} else if (value instanceof Double) {
422+
return AMQPTypeDecoder.DOUBLE;
423+
} else if (value instanceof byte[]) {
424+
return ((byte[]) value).length > 255 ? AMQPTypeDecoder.BIN32 : AMQPTypeDecoder.BIN8;
425+
} else if (value instanceof String) {
426+
return value.toString().getBytes(StandardCharsets.UTF_8).length > 255
427+
? AMQPTypeDecoder.STR32UTF8
428+
: AMQPTypeDecoder.STR8UTF8;
429+
} else if (value instanceof Character) {
430+
return AMQPTypeDecoder.CHAR;
431+
} else if (value instanceof Date) {
432+
return AMQPTypeDecoder.TIMESTAMP;
433+
} else if (value instanceof Symbol) {
434+
return value.toString().getBytes(StandardCharsets.US_ASCII).length > 255
435+
? AMQPTypeDecoder.SYM32
436+
: AMQPTypeDecoder.SYM8;
437+
} else if (value instanceof UUID) {
438+
return AMQPTypeDecoder.UUID;
439+
} else if (value instanceof List) {
440+
List<?> l = (List<?>) value;
441+
if (l.isEmpty()) {
442+
return AMQPTypeDecoder.LIST0;
443+
} else if (l.size() > 255) {
444+
return AMQPTypeDecoder.LIST32;
445+
} else {
446+
return AMQPTypeDecoder.LIST8;
447+
}
448+
} else if (value instanceof Map) {
449+
Map<?, ?> source = (Map<?, ?>) value;
450+
return source.size() * 2 > 255 ? AMQPTypeDecoder.MAP32 : AMQPTypeDecoder.MAP8;
451+
} else if (value instanceof Object[]) {
452+
Object[] source = (Object[]) value;
453+
if (source.length > 255) {
454+
return AMQPTypeDecoder.ARRAY32;
455+
} else {
456+
return AMQPTypeDecoder.ARRAY8;
457+
}
458+
} else if (value == null) {
459+
return AMQPTypeDecoder.NULL;
460+
} else {
461+
throw new IllegalArgumentException("Type not supported: " + value.getClass());
328462
}
329463
}
330464

0 commit comments

Comments
 (0)