Skip to content

Commit e47d1e6

Browse files
committed
Initial commit of KafkaNull changes to SmartCompositeMessageConverter
1 parent e826a99 commit e47d1e6

File tree

1 file changed

+33
-31
lines changed

1 file changed

+33
-31
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.lang.reflect.Type;
2020
import java.util.ArrayList;
2121
import java.util.Collection;
22+
import java.util.Iterator;
2223
import java.util.List;
2324

2425
import org.apache.commons.logging.Log;
@@ -34,7 +35,6 @@
3435
import org.springframework.messaging.converter.SmartMessageConverter;
3536
import org.springframework.messaging.support.MessageBuilder;
3637
import org.springframework.messaging.support.MessageHeaderAccessor;
37-
import org.springframework.util.CollectionUtils;
3838
import org.springframework.util.MimeType;
3939
import org.springframework.util.StringUtils;
4040

@@ -73,48 +73,50 @@ public Object fromMessage(Message<?> message, Class<?> targetClass) {
7373
return null;
7474
}
7575

76+
@SuppressWarnings("unchecked")
7677
@Override
77-
@Nullable
7878
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
79-
for (MessageConverter converter : getConverters()) {
80-
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
81-
return message.getPayload();
82-
}
83-
84-
if (message.getPayload() instanceof Iterable && conversionHint != null) {
85-
Iterable<Object> iterablePayload = (Iterable) message.getPayload();
86-
Type t = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
87-
Class rawType = FunctionTypeUtils.getRawType(t);
88-
List<Object> resultList = new ArrayList<>();
89-
for (Object item : iterablePayload) {
90-
/*
91-
* Somewhere here we can do KafkaNull check or see below
92-
*/
93-
Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build();
94-
Object result = (converter instanceof SmartMessageConverter & rawType != t ?
95-
((SmartMessageConverter) converter).fromMessage(m, rawType, t) :
96-
converter.fromMessage(m, rawType));
97-
if (result != null) {
98-
/*
99-
* Or most likely here we can do the KafkaNull check and not add it to the list
100-
*/
101-
resultList.add(result);
102-
}
79+
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
80+
return message.getPayload();
81+
}
82+
Object result = null;
83+
if (message.getPayload() instanceof Iterable && conversionHint != null) {
84+
Iterable<Object> iterablePayload = (Iterable<Object>) message.getPayload();
85+
Type genericItemType = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
86+
Class<?> genericItemRawType = FunctionTypeUtils.getRawType(genericItemType);
87+
List<Object> resultList = new ArrayList<>();
88+
for (Object item : iterablePayload) {
89+
boolean isConverted = false;
90+
if (item.getClass().getName().startsWith("org.springframework.kafka.support.KafkaNull")) {
91+
resultList.add(item);
92+
isConverted = true;
10393
}
104-
if (!CollectionUtils.isEmpty(resultList)) {
105-
return resultList;
94+
for (Iterator<MessageConverter> iterator = getConverters().iterator(); iterator.hasNext() && !isConverted;) {
95+
Message<?> m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build(); // TODO Message creating may be expensive
96+
MessageConverter converter = (MessageConverter) iterator.next();
97+
Object conversionResult = (converter instanceof SmartMessageConverter & genericItemRawType != genericItemType ?
98+
((SmartMessageConverter) converter).fromMessage(m, genericItemRawType, genericItemType) :
99+
converter.fromMessage(m, genericItemRawType));
100+
if (conversionResult != null) {
101+
resultList.add(conversionResult);
102+
isConverted = true;
103+
}
106104
}
107105
}
108-
else {
109-
Object result = (converter instanceof SmartMessageConverter ?
106+
result = resultList;
107+
}
108+
else {
109+
for (MessageConverter converter : getConverters()) {
110+
result = (converter instanceof SmartMessageConverter ?
110111
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
111112
converter.fromMessage(message, targetClass));
112113
if (result != null) {
113114
return result;
114115
}
115116
}
116117
}
117-
return null;
118+
119+
return result;
118120
}
119121

120122
@Override

0 commit comments

Comments
 (0)