Skip to content

Commit e826a99

Browse files
committed
KafkaNull batch attempt
1 parent 8886d49 commit e826a99

File tree

1 file changed

+37
-5
lines changed

1 file changed

+37
-5
lines changed

spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,25 @@
1616

1717
package org.springframework.cloud.function.context.config;
1818

19+
import java.lang.reflect.Type;
20+
import java.util.ArrayList;
1921
import java.util.Collection;
2022
import java.util.List;
2123

2224
import org.apache.commons.logging.Log;
2325
import org.apache.commons.logging.LogFactory;
2426

27+
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
2528
import org.springframework.lang.Nullable;
2629
import org.springframework.messaging.Message;
2730
import org.springframework.messaging.MessageHeaders;
2831
import org.springframework.messaging.converter.AbstractMessageConverter;
2932
import org.springframework.messaging.converter.CompositeMessageConverter;
3033
import org.springframework.messaging.converter.MessageConverter;
3134
import org.springframework.messaging.converter.SmartMessageConverter;
35+
import org.springframework.messaging.support.MessageBuilder;
3236
import org.springframework.messaging.support.MessageHeaderAccessor;
37+
import org.springframework.util.CollectionUtils;
3338
import org.springframework.util.MimeType;
3439
import org.springframework.util.StringUtils;
3540

@@ -75,11 +80,38 @@ public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Ob
7580
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
7681
return message.getPayload();
7782
}
78-
Object result = (converter instanceof SmartMessageConverter ?
79-
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
80-
converter.fromMessage(message, targetClass));
81-
if (result != null) {
82-
return result;
83+
84+
if (message.getPayload() instanceof Iterable && conversionHint != null) {
85+
Iterable<Object> iterablePayload = (Iterable) message.getPayload();
86+
Type t = FunctionTypeUtils.getImmediateGenericType((Type) conversionHint, 0);
87+
Class rawType = FunctionTypeUtils.getRawType(t);
88+
List<Object> resultList = new ArrayList<>();
89+
for (Object item : iterablePayload) {
90+
/*
91+
* Somewhere here we can do KafkaNull check or see below
92+
*/
93+
Message m = MessageBuilder.withPayload(item).copyHeaders(message.getHeaders()).build();
94+
Object result = (converter instanceof SmartMessageConverter & rawType != t ?
95+
((SmartMessageConverter) converter).fromMessage(m, rawType, t) :
96+
converter.fromMessage(m, rawType));
97+
if (result != null) {
98+
/*
99+
* Or most likely here we can do the KafkaNull check and not add it to the list
100+
*/
101+
resultList.add(result);
102+
}
103+
}
104+
if (!CollectionUtils.isEmpty(resultList)) {
105+
return resultList;
106+
}
107+
}
108+
else {
109+
Object result = (converter instanceof SmartMessageConverter ?
110+
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
111+
converter.fromMessage(message, targetClass));
112+
if (result != null) {
113+
return result;
114+
}
83115
}
84116
}
85117
return null;

0 commit comments

Comments
 (0)