Skip to content

Support map, list, and array types when decoding message annotations #729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/com/rabbitmq/stream/MessageBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.rabbitmq.stream;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
Expand Down Expand Up @@ -186,6 +188,12 @@ interface MessageAnnotationsBuilder {

MessageAnnotationsBuilder entrySymbol(String key, String value);

MessageAnnotationsBuilder entry(String key, List<?> list);

MessageAnnotationsBuilder entry(String key, Map<?, ?> map);

MessageAnnotationsBuilder entryArray(String key, Object[] array);

/**
* Go back to the message builder
*
Expand Down
43 changes: 23 additions & 20 deletions src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Properties;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.*;
import org.apache.qpid.proton.amqp.messaging.*;
Expand Down Expand Up @@ -63,15 +60,15 @@ private static <K> Map<String, Object> createMapFromAmqpMap(
if (amqpMap != null) {
result = new LinkedHashMap<>(amqpMap.size());
for (Map.Entry<K, Object> entry : amqpMap.entrySet()) {
result.put(keyMaker.apply(entry.getKey()), convertApplicationProperty(entry.getValue()));
result.put(keyMaker.apply(entry.getKey()), fromQpidToJava(entry.getValue()));
}
} else {
result = null;
}
return result;
}

private static Object convertApplicationProperty(Object value) {
private static Object fromQpidToJava(Object value) {
if (value instanceof Boolean
|| value instanceof Byte
|| value instanceof Short
Expand All @@ -81,7 +78,10 @@ private static Object convertApplicationProperty(Object value) {
|| value instanceof Double
|| value instanceof String
|| value instanceof Character
|| value instanceof UUID) {
|| value instanceof UUID
|| value instanceof List
|| value instanceof Map
|| value instanceof Object[]) {
return value;
} else if (value instanceof Binary) {
return ((Binary) value).getArray();
Expand All @@ -99,9 +99,10 @@ private static Object convertApplicationProperty(Object value) {
return ((Symbol) value).toString();
} else if (value == null) {
return null;
} else if (value.getClass().isArray()) {
return value;
} else {
throw new IllegalArgumentException(
"Type not supported for an application property: " + value.getClass());
throw new IllegalArgumentException("Type not supported: " + value.getClass());
}
}

Expand Down Expand Up @@ -281,7 +282,10 @@ protected Object convertToQpidType(Object value) {
|| value instanceof String
|| value instanceof Character
|| value instanceof UUID
|| value instanceof Date) {
|| value instanceof Date
|| value instanceof List
|| value instanceof Map
|| value instanceof Object[]) {
return value;
} else if (value instanceof com.rabbitmq.stream.amqp.UnsignedByte) {
return UnsignedByte.valueOf(((com.rabbitmq.stream.amqp.UnsignedByte) value).byteValue());
Expand All @@ -298,8 +302,7 @@ protected Object convertToQpidType(Object value) {
} else if (value == null) {
return null;
} else {
throw new IllegalArgumentException(
"Type not supported for an application property: " + value.getClass());
throw new IllegalArgumentException("Type not supported: " + value.getClass());
}
}

Expand Down Expand Up @@ -634,28 +637,28 @@ public Message copy() {

// from
// https://github.com/apache/activemq/blob/master/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java
private static class ByteArrayWritableBuffer implements WritableBuffer {
static class ByteArrayWritableBuffer implements WritableBuffer {

public static final int DEFAULT_CAPACITY = 4 * 1024;
static final int DEFAULT_CAPACITY = 4 * 1024;

byte[] buffer;
int position;
private byte[] buffer;
private int position;

/** Creates a new WritableBuffer with default capacity. */
public ByteArrayWritableBuffer() {
ByteArrayWritableBuffer() {
this(DEFAULT_CAPACITY);
}

/** Create a new WritableBuffer with the given capacity. */
public ByteArrayWritableBuffer(int capacity) {
ByteArrayWritableBuffer(int capacity) {
this.buffer = new byte[capacity];
}

public byte[] getArray() {
byte[] getArray() {
return buffer;
}

public int getArrayLength() {
int getArrayLength() {
return position;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import com.rabbitmq.stream.MessageBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.math.BigDecimal;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
Expand Down Expand Up @@ -365,6 +362,24 @@ public MessageAnnotationsBuilder entrySymbol(String key, String value) {
return this;
}

@Override
public MessageAnnotationsBuilder entry(String key, List<?> list) {
messageAnnotations.put(Symbol.getSymbol(key), list);
return this;
}

@Override
public MessageAnnotationsBuilder entry(String key, Map<?, ?> map) {
messageAnnotations.put(Symbol.getSymbol(key), map);
return this;
}

@Override
public MessageAnnotationsBuilder entryArray(String key, Object[] array) {
messageAnnotations.put(Symbol.getSymbol(key), array);
return this;
}

@Override
public MessageBuilder messageBuilder() {
return messageBuilder;
Expand Down
150 changes: 142 additions & 8 deletions src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import com.swiftmq.amqp.v100.types.*;
import com.swiftmq.tools.util.DataByteArrayOutputStream;
import java.io.IOException;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class SwiftMqCodec implements Codec {

Expand Down Expand Up @@ -69,13 +68,48 @@ private static Object convertAmqpMapValue(AMQPType value) {
return ((AMQPUuid) value).getValue();
} else if (value instanceof AMQPSymbol) {
return ((AMQPSymbol) value).getValue();
} else if (value instanceof AMQPList) {
try {
List<AMQPType> source = ((AMQPList) value).getValue();
List<Object> target = new ArrayList<>(source.size());
for (AMQPType o : source) {
target.add(convertAmqpMapValue(o));
}
return target;
} catch (IOException e) {
throw new StreamException("Error while reading SwiftMQ list", e);
}
} else if (value instanceof AMQPMap) {
try {
Map<AMQPType, AMQPType> source = ((AMQPMap) value).getValue();
Map<Object, Object> target = new LinkedHashMap<>(source.size());
for (Map.Entry<AMQPType, AMQPType> entry : source.entrySet()) {
target.put(convertAmqpMapValue(entry.getKey()), convertAmqpMapValue(entry.getValue()));
}
return target;
} catch (IOException e) {
throw new StreamException("Error while reading SwiftMQ map", e);
}
} else if (value instanceof AMQPArray) {
try {
AMQPType[] source = ((AMQPArray) value).getValue();
Object target =
Array.newInstance(
source.length == 0 ? Object.class : convertAmqpMapValue(source[0]).getClass(),
source.length);
for (int i = 0; i < source.length; i++) {
Array.set(target, i, convertAmqpMapValue(source[i]));
}
return target;
} catch (IOException e) {
throw new StreamException("Error while reading SwiftMQ array", e);
}
} else if (value instanceof AMQPNull) {
return null;
} else if (value == null) {
return null;
} else {
throw new IllegalArgumentException(
"Type not supported for an application property: " + value.getClass());
throw new IllegalArgumentException("Type not supported: " + value.getClass());
}
}

Expand Down Expand Up @@ -320,11 +354,111 @@ protected static AMQPType convertToSwiftMqType(Object value) {
return new AMQPSymbol(value.toString());
} else if (value instanceof UUID) {
return new AMQPUuid((UUID) value);
} else if (value instanceof List) {
List<?> source = (List<?>) value;
List<AMQPType> target = new ArrayList<>(source.size());
for (Object o : source) {
target.add(convertToSwiftMqType(o));
}
try {
return new AMQPList(target);
} catch (IOException e) {
throw new StreamException("Error while creating SwiftMQ list", e);
}
} else if (value instanceof Map) {
Map<?, ?> source = (Map<?, ?>) value;
Map<AMQPType, AMQPType> target = new LinkedHashMap<>(source.size());
for (Map.Entry<?, ?> entry : source.entrySet()) {
target.put(convertToSwiftMqType(entry.getKey()), convertToSwiftMqType(entry.getValue()));
}
try {
return new AMQPMap(target);
} catch (IOException e) {
throw new StreamException("Error while creating SwiftMQ map", e);
}
} else if (value instanceof Object[]) {
Object[] source = (Object[]) value;
AMQPType[] target = new AMQPType[source.length];
for (int i = 0; i < source.length; i++) {
target[i] = convertToSwiftMqType(source[i]);
}
try {
int code = source.length == 0 ? AMQPTypeDecoder.UNKNOWN : toSwiftMqTypeCode(source[0]);
return new AMQPArray(code, target);
} catch (IOException e) {
throw new StreamException("Error while creating SwiftMQ list", e);
}
} else if (value == null) {
return AMQPNull.NULL;
} else {
throw new IllegalArgumentException(
"Type not supported for an application property: " + value.getClass());
throw new IllegalArgumentException("Type not supported: " + value.getClass());
}
}

protected static int toSwiftMqTypeCode(Object value) {
if (value instanceof Boolean) {
return AMQPTypeDecoder.BOOLEAN;
} else if (value instanceof Byte) {
return AMQPTypeDecoder.BYTE;
} else if (value instanceof Short) {
return AMQPTypeDecoder.SHORT;
} else if (value instanceof Integer) {
int v = (Integer) value;
return (v < -128 || v > 127) ? AMQPTypeDecoder.INT : AMQPTypeDecoder.SINT;
} else if (value instanceof Long) {
long v = (Long) value;
return (v < -128 || v > 127) ? AMQPTypeDecoder.LONG : AMQPTypeDecoder.SLONG;
} else if (value instanceof UnsignedByte) {
return AMQPTypeDecoder.UBYTE;
} else if (value instanceof UnsignedShort) {
return AMQPTypeDecoder.USHORT;
} else if (value instanceof UnsignedInteger) {
return AMQPTypeDecoder.UINT;
} else if (value instanceof UnsignedLong) {
return AMQPTypeDecoder.ULONG;
} else if (value instanceof Float) {
return AMQPTypeDecoder.FLOAT;
} else if (value instanceof Double) {
return AMQPTypeDecoder.DOUBLE;
} else if (value instanceof byte[]) {
return ((byte[]) value).length > 255 ? AMQPTypeDecoder.BIN32 : AMQPTypeDecoder.BIN8;
} else if (value instanceof String) {
return value.toString().getBytes(StandardCharsets.UTF_8).length > 255
? AMQPTypeDecoder.STR32UTF8
: AMQPTypeDecoder.STR8UTF8;
} else if (value instanceof Character) {
return AMQPTypeDecoder.CHAR;
} else if (value instanceof Date) {
return AMQPTypeDecoder.TIMESTAMP;
} else if (value instanceof Symbol) {
return value.toString().getBytes(StandardCharsets.US_ASCII).length > 255
? AMQPTypeDecoder.SYM32
: AMQPTypeDecoder.SYM8;
} else if (value instanceof UUID) {
return AMQPTypeDecoder.UUID;
} else if (value instanceof List) {
List<?> l = (List<?>) value;
if (l.isEmpty()) {
return AMQPTypeDecoder.LIST0;
} else if (l.size() > 255) {
return AMQPTypeDecoder.LIST32;
} else {
return AMQPTypeDecoder.LIST8;
}
} else if (value instanceof Map) {
Map<?, ?> source = (Map<?, ?>) value;
return source.size() * 2 > 255 ? AMQPTypeDecoder.MAP32 : AMQPTypeDecoder.MAP8;
} else if (value instanceof Object[]) {
Object[] source = (Object[]) value;
if (source.length > 255) {
return AMQPTypeDecoder.ARRAY32;
} else {
return AMQPTypeDecoder.ARRAY8;
}
} else if (value == null) {
return AMQPTypeDecoder.NULL;
} else {
throw new IllegalArgumentException("Type not supported: " + value.getClass());
}
}

Expand Down
Loading