diff --git a/src/main/java/com/rabbitmq/stream/MessageBuilder.java b/src/main/java/com/rabbitmq/stream/MessageBuilder.java index e91414d90a..02a9d3c3ee 100644 --- a/src/main/java/com/rabbitmq/stream/MessageBuilder.java +++ b/src/main/java/com/rabbitmq/stream/MessageBuilder.java @@ -15,6 +15,8 @@ package com.rabbitmq.stream; import java.math.BigDecimal; +import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -186,6 +188,12 @@ interface MessageAnnotationsBuilder { MessageAnnotationsBuilder entrySymbol(String key, String value); + MessageAnnotationsBuilder entry(String key, List list); + + MessageAnnotationsBuilder entry(String key, Map map); + + MessageAnnotationsBuilder entryArray(String key, Object[] array); + /** * Go back to the message builder * diff --git a/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java b/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java index 8790128302..abadc2f618 100644 --- a/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java +++ b/src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java @@ -19,10 +19,7 @@ import com.rabbitmq.stream.MessageBuilder; import com.rabbitmq.stream.Properties; import java.nio.ByteBuffer; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.function.Function; import org.apache.qpid.proton.amqp.*; import org.apache.qpid.proton.amqp.messaging.*; @@ -63,7 +60,7 @@ private static Map createMapFromAmqpMap( if (amqpMap != null) { result = new LinkedHashMap<>(amqpMap.size()); for (Map.Entry entry : amqpMap.entrySet()) { - result.put(keyMaker.apply(entry.getKey()), convertApplicationProperty(entry.getValue())); + result.put(keyMaker.apply(entry.getKey()), fromQpidToJava(entry.getValue())); } } else { result = null; @@ -71,7 +68,7 @@ private static Map createMapFromAmqpMap( return result; } - private static Object convertApplicationProperty(Object value) { + private static Object fromQpidToJava(Object value) { if (value instanceof Boolean || value instanceof Byte || value instanceof Short @@ -81,7 +78,10 @@ private static Object convertApplicationProperty(Object value) { || value instanceof Double || value instanceof String || value instanceof Character - || value instanceof UUID) { + || value instanceof UUID + || value instanceof List + || value instanceof Map + || value instanceof Object[]) { return value; } else if (value instanceof Binary) { return ((Binary) value).getArray(); @@ -99,9 +99,10 @@ private static Object convertApplicationProperty(Object value) { return ((Symbol) value).toString(); } else if (value == null) { return null; + } else if (value.getClass().isArray()) { + return value; } else { - throw new IllegalArgumentException( - "Type not supported for an application property: " + value.getClass()); + throw new IllegalArgumentException("Type not supported: " + value.getClass()); } } @@ -281,7 +282,10 @@ protected Object convertToQpidType(Object value) { || value instanceof String || value instanceof Character || value instanceof UUID - || value instanceof Date) { + || value instanceof Date + || value instanceof List + || value instanceof Map + || value instanceof Object[]) { return value; } else if (value instanceof com.rabbitmq.stream.amqp.UnsignedByte) { return UnsignedByte.valueOf(((com.rabbitmq.stream.amqp.UnsignedByte) value).byteValue()); @@ -298,8 +302,7 @@ protected Object convertToQpidType(Object value) { } else if (value == null) { return null; } else { - throw new IllegalArgumentException( - "Type not supported for an application property: " + value.getClass()); + throw new IllegalArgumentException("Type not supported: " + value.getClass()); } } @@ -634,28 +637,28 @@ public Message copy() { // from // https://github.com/apache/activemq/blob/master/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java - private static class ByteArrayWritableBuffer implements WritableBuffer { + static class ByteArrayWritableBuffer implements WritableBuffer { - public static final int DEFAULT_CAPACITY = 4 * 1024; + static final int DEFAULT_CAPACITY = 4 * 1024; - byte[] buffer; - int position; + private byte[] buffer; + private int position; /** Creates a new WritableBuffer with default capacity. */ - public ByteArrayWritableBuffer() { + ByteArrayWritableBuffer() { this(DEFAULT_CAPACITY); } /** Create a new WritableBuffer with the given capacity. */ - public ByteArrayWritableBuffer(int capacity) { + ByteArrayWritableBuffer(int capacity) { this.buffer = new byte[capacity]; } - public byte[] getArray() { + byte[] getArray() { return buffer; } - public int getArrayLength() { + int getArrayLength() { return position; } diff --git a/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java b/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java index 4d5502a1f5..1d1851bb52 100644 --- a/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java +++ b/src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java @@ -20,10 +20,7 @@ import com.rabbitmq.stream.MessageBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.math.BigDecimal; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -365,6 +362,24 @@ public MessageAnnotationsBuilder entrySymbol(String key, String value) { return this; } + @Override + public MessageAnnotationsBuilder entry(String key, List list) { + messageAnnotations.put(Symbol.getSymbol(key), list); + return this; + } + + @Override + public MessageAnnotationsBuilder entry(String key, Map map) { + messageAnnotations.put(Symbol.getSymbol(key), map); + return this; + } + + @Override + public MessageAnnotationsBuilder entryArray(String key, Object[] array) { + messageAnnotations.put(Symbol.getSymbol(key), array); + return this; + } + @Override public MessageBuilder messageBuilder() { return messageBuilder; diff --git a/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java b/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java index 0feb5d6837..76f15bd806 100644 --- a/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java +++ b/src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java @@ -25,10 +25,9 @@ import com.swiftmq.amqp.v100.types.*; import com.swiftmq.tools.util.DataByteArrayOutputStream; import java.io.IOException; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; +import java.lang.reflect.Array; +import java.nio.charset.StandardCharsets; +import java.util.*; public class SwiftMqCodec implements Codec { @@ -69,13 +68,48 @@ private static Object convertAmqpMapValue(AMQPType value) { return ((AMQPUuid) value).getValue(); } else if (value instanceof AMQPSymbol) { return ((AMQPSymbol) value).getValue(); + } else if (value instanceof AMQPList) { + try { + List source = ((AMQPList) value).getValue(); + List target = new ArrayList<>(source.size()); + for (AMQPType o : source) { + target.add(convertAmqpMapValue(o)); + } + return target; + } catch (IOException e) { + throw new StreamException("Error while reading SwiftMQ list", e); + } + } else if (value instanceof AMQPMap) { + try { + Map source = ((AMQPMap) value).getValue(); + Map target = new LinkedHashMap<>(source.size()); + for (Map.Entry entry : source.entrySet()) { + target.put(convertAmqpMapValue(entry.getKey()), convertAmqpMapValue(entry.getValue())); + } + return target; + } catch (IOException e) { + throw new StreamException("Error while reading SwiftMQ map", e); + } + } else if (value instanceof AMQPArray) { + try { + AMQPType[] source = ((AMQPArray) value).getValue(); + Object target = + Array.newInstance( + source.length == 0 ? Object.class : convertAmqpMapValue(source[0]).getClass(), + source.length); + for (int i = 0; i < source.length; i++) { + Array.set(target, i, convertAmqpMapValue(source[i])); + } + return target; + } catch (IOException e) { + throw new StreamException("Error while reading SwiftMQ array", e); + } } else if (value instanceof AMQPNull) { return null; } else if (value == null) { return null; } else { - throw new IllegalArgumentException( - "Type not supported for an application property: " + value.getClass()); + throw new IllegalArgumentException("Type not supported: " + value.getClass()); } } @@ -320,11 +354,111 @@ protected static AMQPType convertToSwiftMqType(Object value) { return new AMQPSymbol(value.toString()); } else if (value instanceof UUID) { return new AMQPUuid((UUID) value); + } else if (value instanceof List) { + List source = (List) value; + List target = new ArrayList<>(source.size()); + for (Object o : source) { + target.add(convertToSwiftMqType(o)); + } + try { + return new AMQPList(target); + } catch (IOException e) { + throw new StreamException("Error while creating SwiftMQ list", e); + } + } else if (value instanceof Map) { + Map source = (Map) value; + Map target = new LinkedHashMap<>(source.size()); + for (Map.Entry entry : source.entrySet()) { + target.put(convertToSwiftMqType(entry.getKey()), convertToSwiftMqType(entry.getValue())); + } + try { + return new AMQPMap(target); + } catch (IOException e) { + throw new StreamException("Error while creating SwiftMQ map", e); + } + } else if (value instanceof Object[]) { + Object[] source = (Object[]) value; + AMQPType[] target = new AMQPType[source.length]; + for (int i = 0; i < source.length; i++) { + target[i] = convertToSwiftMqType(source[i]); + } + try { + int code = source.length == 0 ? AMQPTypeDecoder.UNKNOWN : toSwiftMqTypeCode(source[0]); + return new AMQPArray(code, target); + } catch (IOException e) { + throw new StreamException("Error while creating SwiftMQ list", e); + } } else if (value == null) { return AMQPNull.NULL; } else { - throw new IllegalArgumentException( - "Type not supported for an application property: " + value.getClass()); + throw new IllegalArgumentException("Type not supported: " + value.getClass()); + } + } + + protected static int toSwiftMqTypeCode(Object value) { + if (value instanceof Boolean) { + return AMQPTypeDecoder.BOOLEAN; + } else if (value instanceof Byte) { + return AMQPTypeDecoder.BYTE; + } else if (value instanceof Short) { + return AMQPTypeDecoder.SHORT; + } else if (value instanceof Integer) { + int v = (Integer) value; + return (v < -128 || v > 127) ? AMQPTypeDecoder.INT : AMQPTypeDecoder.SINT; + } else if (value instanceof Long) { + long v = (Long) value; + return (v < -128 || v > 127) ? AMQPTypeDecoder.LONG : AMQPTypeDecoder.SLONG; + } else if (value instanceof UnsignedByte) { + return AMQPTypeDecoder.UBYTE; + } else if (value instanceof UnsignedShort) { + return AMQPTypeDecoder.USHORT; + } else if (value instanceof UnsignedInteger) { + return AMQPTypeDecoder.UINT; + } else if (value instanceof UnsignedLong) { + return AMQPTypeDecoder.ULONG; + } else if (value instanceof Float) { + return AMQPTypeDecoder.FLOAT; + } else if (value instanceof Double) { + return AMQPTypeDecoder.DOUBLE; + } else if (value instanceof byte[]) { + return ((byte[]) value).length > 255 ? AMQPTypeDecoder.BIN32 : AMQPTypeDecoder.BIN8; + } else if (value instanceof String) { + return value.toString().getBytes(StandardCharsets.UTF_8).length > 255 + ? AMQPTypeDecoder.STR32UTF8 + : AMQPTypeDecoder.STR8UTF8; + } else if (value instanceof Character) { + return AMQPTypeDecoder.CHAR; + } else if (value instanceof Date) { + return AMQPTypeDecoder.TIMESTAMP; + } else if (value instanceof Symbol) { + return value.toString().getBytes(StandardCharsets.US_ASCII).length > 255 + ? AMQPTypeDecoder.SYM32 + : AMQPTypeDecoder.SYM8; + } else if (value instanceof UUID) { + return AMQPTypeDecoder.UUID; + } else if (value instanceof List) { + List l = (List) value; + if (l.isEmpty()) { + return AMQPTypeDecoder.LIST0; + } else if (l.size() > 255) { + return AMQPTypeDecoder.LIST32; + } else { + return AMQPTypeDecoder.LIST8; + } + } else if (value instanceof Map) { + Map source = (Map) value; + return source.size() * 2 > 255 ? AMQPTypeDecoder.MAP32 : AMQPTypeDecoder.MAP8; + } else if (value instanceof Object[]) { + Object[] source = (Object[]) value; + if (source.length > 255) { + return AMQPTypeDecoder.ARRAY32; + } else { + return AMQPTypeDecoder.ARRAY8; + } + } else if (value == null) { + return AMQPTypeDecoder.NULL; + } else { + throw new IllegalArgumentException("Type not supported: " + value.getClass()); } } diff --git a/src/main/java/com/rabbitmq/stream/codec/SwiftMqMessageBuilder.java b/src/main/java/com/rabbitmq/stream/codec/SwiftMqMessageBuilder.java index 3b873b47e6..9e4e8ea8ee 100644 --- a/src/main/java/com/rabbitmq/stream/codec/SwiftMqMessageBuilder.java +++ b/src/main/java/com/rabbitmq/stream/codec/SwiftMqMessageBuilder.java @@ -14,6 +14,9 @@ // info@rabbitmq.com. package com.rabbitmq.stream.codec; +import static com.rabbitmq.stream.codec.SwiftMqCodec.convertToSwiftMqType; +import static com.rabbitmq.stream.codec.SwiftMqCodec.toSwiftMqTypeCode; + import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageBuilder; import com.rabbitmq.stream.StreamException; @@ -23,10 +26,9 @@ import com.swiftmq.amqp.v100.types.*; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; +import java.lang.reflect.Array; import java.math.BigDecimal; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -321,6 +323,62 @@ protected void addEntry(String key, String value) { protected void addEntrySymbol(String key, String value) { map.put(keyMaker.apply(key), value == null ? AMQPNull.NULL : new AMQPSymbol(value)); } + + protected void addEntry(String key, List list) { + AMQPType amqpValue; + if (list == null) { + amqpValue = AMQPNull.NULL; + } else { + List l = new ArrayList<>(list.size()); + for (Object o : list) { + l.add(convertToSwiftMqType(o)); + } + try { + amqpValue = new AMQPList(l); + } catch (IOException e) { + throw new StreamException("Error while creating SwiftMq list", e); + } + } + map.put(keyMaker.apply(key), amqpValue); + } + + protected void addEntry(String key, Map mapEntry) { + AMQPType amqpValue; + if (mapEntry == null) { + amqpValue = AMQPNull.NULL; + } else { + Map m = new LinkedHashMap<>(mapEntry.size()); + mapEntry.forEach( + (k, v) -> { + m.put(convertToSwiftMqType(k), convertToSwiftMqType(v)); + }); + try { + amqpValue = new AMQPMap(m); + } catch (IOException e) { + throw new StreamException("Error while creating SwiftMQ map", e); + } + } + map.put(keyMaker.apply(key), amqpValue); + } + + protected void addEntry(String key, Object[] array) { + AMQPType amqpValue; + if (array == null) { + amqpValue = AMQPNull.NULL; + } else { + AMQPType[] a = new AMQPType[array.length]; + for (int i = 0; i < array.length; i++) { + a[i] = convertToSwiftMqType(Array.get(array, i)); + } + try { + int code = a.length == 0 ? AMQPTypeDecoder.UNKNOWN : toSwiftMqTypeCode(array[0]); + amqpValue = new AMQPArray(code, a); + } catch (IOException e) { + throw new StreamException("Error while creating SwiftMq list", e); + } + } + map.put(keyMaker.apply(key), amqpValue); + } } private static class SwiftMqApplicationPropertiesBuilder extends AmqpMapBuilderSupport @@ -587,6 +645,24 @@ public MessageAnnotationsBuilder entrySymbol(String key, String value) { return this; } + @Override + public MessageAnnotationsBuilder entry(String key, List list) { + addEntry(key, list); + return this; + } + + @Override + public MessageAnnotationsBuilder entry(String key, Map map) { + addEntry(key, map); + return this; + } + + @Override + public MessageAnnotationsBuilder entryArray(String key, Object[] array) { + addEntry(key, array); + return this; + } + @Override public MessageBuilder messageBuilder() { return messageBuilder; diff --git a/src/main/java/com/rabbitmq/stream/codec/WrapperMessageBuilder.java b/src/main/java/com/rabbitmq/stream/codec/WrapperMessageBuilder.java index 22cd523e67..e1c1afc850 100644 --- a/src/main/java/com/rabbitmq/stream/codec/WrapperMessageBuilder.java +++ b/src/main/java/com/rabbitmq/stream/codec/WrapperMessageBuilder.java @@ -20,10 +20,7 @@ import com.rabbitmq.stream.amqp.*; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.math.BigDecimal; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class WrapperMessageBuilder implements MessageBuilder { @@ -220,6 +217,24 @@ public MessageAnnotationsBuilder entrySymbol(String key, String value) { return this; } + @Override + public MessageAnnotationsBuilder entry(String key, List list) { + messageAnnotations.put(key, list); + return this; + } + + @Override + public MessageAnnotationsBuilder entry(String key, Map map) { + messageAnnotations.put(key, map); + return this; + } + + @Override + public MessageAnnotationsBuilder entryArray(String key, Object[] array) { + messageAnnotations.put(key, array); + return this; + } + @Override public MessageBuilder messageBuilder() { return this.messageBuilder; diff --git a/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java b/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java index 064e013617..b80acf1b2f 100644 --- a/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java +++ b/src/test/java/com/rabbitmq/stream/codec/CodecsTest.java @@ -33,18 +33,18 @@ import java.math.BigInteger; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Stream; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.ThrowableAssert; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -248,6 +248,22 @@ void codecs(CodecCouple codecCouple) { .entry("annotations.string", string) .entrySymbol("annotations.symbol", symbol) .entry("annotations.null", (String) null) + .entry( + "list", + List.of("1", "2", 3, List.of("1"), Map.of("k1", "v1"), new String[] {"1"})) + .entry( + "map", + Map.of( + "k1", + "v1", + "k2", + List.of("v2"), + "k3", + Map.of("k1", "v1"), + "k4", + new String[] {"1"})) + .entryArray("arrayString", new String[] {"1", "2", "3"}) + .entryArray("arrayInt", new Integer[] {200, 201, 202}) .messageBuilder() .build(); outboundMessage.annotate("extra.annotation", "extra annotation value"); @@ -474,6 +490,35 @@ void codecs(CodecCouple codecCouple) { .isNotNull() .isInstanceOf(String.class) .isEqualTo("extra annotation value"); + + List list = (List) inboundMessage.getMessageAnnotations().get("list"); + assertThat(list.get(0)).isEqualTo("1"); + assertThat(list.get(1)).isEqualTo("2"); + assertThat(list.get(2)).isEqualTo(3); + assertThat(list.get(3)).isEqualTo(List.of("1")); + assertThat(list.get(4)).isEqualTo(Map.of("k1", "v1")); + assertThat(list.get(5)).isEqualTo(new String[] {"1"}); + + Map map = (Map) inboundMessage.getMessageAnnotations().get("map"); + assertThat(map.get("k1")).isEqualTo("v1"); + assertThat(map.get("k2")).isEqualTo(List.of("v2")); + assertThat(map.get("k3")).isEqualTo(Map.of("k1", "v1")); + assertThat(map.get("k4")).isEqualTo(new String[] {"1"}); + + Object[] arrayString = + (Object[]) inboundMessage.getMessageAnnotations().get("arrayString"); + assertThat(arrayString).containsExactly("1", "2", "3"); + int[] arrayInt; + // QPid codec returns int[] and SwiftMQ codec returns Integer[] + if (inboundMessage.getMessageAnnotations().get("arrayInt") instanceof Integer[]) { + arrayInt = + Arrays.stream((Integer[]) inboundMessage.getMessageAnnotations().get("arrayInt")) + .mapToInt(Integer::intValue) + .toArray(); + } else { + arrayInt = (int[]) inboundMessage.getMessageAnnotations().get("arrayInt"); + } + assertThat(arrayInt).containsExactly(200, 201, 202); }); } @@ -624,6 +669,40 @@ void copy(CodecCouple codecCouple) { .containsEntry("copy", "copy value"); } + @Test + void qpidDoesNotSupportPrimitiveArrayEncodingInMap() { + org.apache.qpid.proton.message.Message message = + org.apache.qpid.proton.message.Message.Factory.create(); + Map map = new LinkedHashMap<>(); + map.put(Symbol.valueOf("foo"), new int[] {1, 2, 3}); + message.setMessageAnnotations(new MessageAnnotations(map)); + assertThatThrownBy(() -> qpidEncodeDecode(message)).isInstanceOf(ClassCastException.class); + } + + @Test + void qpidEncodeIntegerArrayDecodeIntArrayInMap() { + org.apache.qpid.proton.message.Message message = + org.apache.qpid.proton.message.Message.Factory.create(); + Map map = new LinkedHashMap<>(); + map.put(Symbol.valueOf("foo"), new Integer[] {1, 2, 3}); + message.setMessageAnnotations(new MessageAnnotations(map)); + message = qpidEncodeDecode(message); + map = message.getMessageAnnotations().getValue(); + assertThat(map.get(Symbol.valueOf("foo"))).isInstanceOf(int[].class); + } + + private static org.apache.qpid.proton.message.Message qpidEncodeDecode( + org.apache.qpid.proton.message.Message in) { + QpidProtonCodec.ByteArrayWritableBuffer writableBuffer = + new QpidProtonCodec.ByteArrayWritableBuffer(8192); + in.encode(writableBuffer); + + org.apache.qpid.proton.message.Message out = + org.apache.qpid.proton.message.Message.Factory.create(); + out.decode(writableBuffer.getArray(), 0, writableBuffer.getArrayLength()); + return out; + } + MessageTestConfiguration test( Function messageOperation, Consumer messageExpectation) {